[asterisk-commits] dhubbard: branch dhubbard/named_processors r108891 - in /team/dhubbard/named_...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Mar 14 22:11:11 CDT 2008
Author: dhubbard
Date: Fri Mar 14 22:11:10 2008
New Revision: 108891
URL: http://svn.digium.com/view/asterisk?view=rev&rev=108891
Log:
get rid of task.c and task.h
Removed:
team/dhubbard/named_processors/include/asterisk/task.h
team/dhubbard/named_processors/main/task.c
Modified:
team/dhubbard/named_processors/apps/app_queue.c
team/dhubbard/named_processors/include/asterisk/taskprocessor.h
team/dhubbard/named_processors/main/Makefile
team/dhubbard/named_processors/main/taskprocessor.c
team/dhubbard/named_processors/res/res_testobserver.c
team/dhubbard/named_processors/res/sandbox/simobject.c
team/dhubbard/named_processors/res/sandbox/taskconsumer.c
Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Fri Mar 14 22:11:10 2008
@@ -93,7 +93,6 @@
#include "asterisk/strings.h"
#include "asterisk/global_datastores.h"
#include "asterisk/taskprocessor.h"
-#include "asterisk/task.h"
/*!
* \par Please read before modifying this file.
@@ -791,10 +790,10 @@
}
sc->state = state;
strcpy(sc->dev, device);
- t = get_available_task(handle_statechange, sc, "app_queue-device_state_cb");
+ t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
if (tpsp->queue_task(tpsp, t) < 0) {
ast_log(LOG_WARNING, "queue_task failed!!\n");
- release_task(t);
+ ast_task_free(t);
}
}
Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Fri Mar 14 22:11:10 2008
@@ -23,6 +23,17 @@
#ifndef __taskprocessor_h__
#define __taskprocessor_h__
+
+struct a_task {
+ int (* execute)(struct a_task* t);
+ void *_datap;
+ size_t _datapsize;
+ void* _p_producer;
+ void* _p_consumer;
+ time_t _timestamp;
+ char _source[256];
+ AST_LIST_ENTRY(a_task) list;
+};
struct taskprocessor_singleton_stats {
unsigned long _max_qsize;
@@ -56,6 +67,22 @@
unsigned char _evtq_poll_thread_run;
+/*
+ * get an available a_task structure from the pool of available
+ * structures. If no structures are available, allocate one
+ */
+struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src);
+int task_pool_size(void);
+/*
+ * release a_task structure back to the pool
+ */
+int ast_task_free(struct a_task* task);
+
+int stop_taskpool(void);
+int destroy_task_pool(void);
+
+int noop_task_execute(struct a_task* t);
+
int create_taskprocessor_singleton(const char* name, void* (*func)(void*));
int start_taskprocessor_singleton(const char* name);
int size_of_taskprocessor_singleton_list(void);
Modified: team/dhubbard/named_processors/main/Makefile
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/Makefile?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/main/Makefile (original)
+++ team/dhubbard/named_processors/main/Makefile Fri Mar 14 22:11:10 2008
@@ -30,7 +30,7 @@
cryptostub.o sha1.o http.o fixedjitterbuf.o abstract_jb.o \
strcompat.o threadstorage.o dial.o event.o adsistub.o audiohook.o \
astobj2.o hashtab.o global_datastores.o $(RESAMPLE_OBJS) version.o \
- features.o task.o taskprocessor.o
+ features.o taskprocessor.o
# we need to link in the objects statically, not as a library, because
# otherwise modules will not have them available if none of the static
Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Fri Mar 14 22:11:10 2008
@@ -1,7 +1,7 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 2007, Dwayne M. Hubbard
+ * Copyright (C) 2007-2008, Dwayne M. Hubbard
*
* Dwayne M. Hubbard <dhubbard at digium.com>
*
@@ -21,7 +21,6 @@
#include <asterisk.h>
#include <asterisk/astobj2.h>
#include <asterisk/cli.h>
-#include <asterisk/task.h>
#include <asterisk/taskprocessor.h>
#include <signal.h>
#include <sys/time.h>
@@ -30,24 +29,27 @@
#define DEFAULT_POLL_FREQUENCY 1
+static int _global_killflag = 0;
AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
-static int _global_killflag = 0;
-
+
+static int _global_clireg = 0;
AST_MUTEX_DEFINE_STATIC(_global_clireg_lock);
-static int _global_clireg = 0;
pthread_attr_t _attribute[100];
AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
static int _taskprocessor_singletons_list_size = 0;
+
+static long _task_pool_size = 0;
+AST_LIST_HEAD_STATIC(_task_pool, a_task);
+
+static int _global_kill_taskpool = 0;
+AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
static void destroy_taskprocessor_singleton(void *tps);
static int remove_taskprocessor_singleton(const char* name);
static int taskprocessor_ping(struct a_task* e);
-/*********
- * CLI
- *********/
static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -55,6 +57,141 @@
AST_CLI_DEFINE(cli_taskprocessor_ping, "Ping a named task processors"),
AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
};
+
+int noop_task_execute(struct a_task* t)
+{
+ ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
+ return 0;
+}
+
+/*
+ * get an available a_task structure from the pool of available
+ * structures. If no structures are available, allocate one
+ */
+struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src)
+{
+ struct a_task *t;
+ int lock_failures = 0;
+
+ AST_LIST_LOCK(&_task_pool);
+ while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&_task_pool);
+ usleep(1);
+ if (lock_failures > 10) {
+ ast_log(LOG_ERROR, "\n\tcannot lock task pool for \'%s\'.\n", src);
+ return NULL;
+ }
+ AST_LIST_LOCK(&_task_pool);
+ }
+ /* If the kill_taskpool flag is TRUE, then we should not give any more pooled tasks to requestors */
+ if (_global_kill_taskpool) {
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ ast_log(LOG_NOTICE, "task pool is no longer available to requestors.\n");
+ return NULL;
+ }
+ if (AST_LIST_EMPTY(&_task_pool)) {
+ t = ast_calloc(1,sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "Poop. ast_calloc failed to get a task for \'%s\'\n", src);
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return NULL;
+ }
+ t->execute = task_exe;
+ t->_datap = datap;
+ strncpy(t->_source, src, sizeof(t->_source));
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return t;
+ }
+ t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+ _task_pool_size -= 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ if (t) {
+ t->execute = task_exe;
+ t->_datap = datap;
+ strncpy(t->_source, src, sizeof(t->_source));
+ }
+ return t;
+}
+
+/*
+ * release a_task structure back to the pool
+ */
+int ast_task_free(struct a_task* t)
+{
+ int lock_failures = 0;
+ if (t->_datap) {
+ ast_free(t->_datap);
+ }
+ memset(t, 0, sizeof(struct a_task));
+ AST_LIST_LOCK(&_task_pool);
+ while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&_task_pool);
+ usleep(1);
+ if (lock_failures > 10) {
+ ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+ return 0;
+ }
+ AST_LIST_LOCK(&_task_pool);
+ }
+ if (_global_kill_taskpool != 0) {
+ ast_log(LOG_NOTICE, "task pool is no longer available for task releasing\n");
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+ }
+ AST_LIST_INSERT_TAIL(&_task_pool, t, list);
+ _task_pool_size += 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+}
+
+/*
+ * stop the task pool
+ */
+int stop_taskpool()
+{
+ ast_mutex_lock(&_global_kill_taskpool_lock);
+ _global_kill_taskpool = 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ return 0;
+}
+
+int task_pool_size()
+{
+ int size = 0;
+ AST_LIST_LOCK(&_task_pool);
+ size = _task_pool_size;
+ AST_LIST_UNLOCK(&_task_pool);
+ return size;
+}
+
+/*
+ * destroy a_task pool
+ */
+int destroy_task_pool(void)
+{
+ struct a_task* t = NULL;
+
+ AST_LIST_LOCK(&_task_pool);
+ while (!AST_LIST_EMPTY(&_task_pool)) {
+ t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+ if (t) {
+ ast_free(t);
+ t = NULL;
+ }
+ _task_pool_size -= 1;
+ }
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+}
+
static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
@@ -92,14 +229,14 @@
return CLI_SUCCESS;
}
- t = get_available_task(taskprocessor_ping, 0, "cli_taskprocessor_ping");
+ t = ast_task_alloc(taskprocessor_ping, 0, "cli_taskprocessor_ping");
if (!t) {
ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
return RESULT_SUCCESS;
}
if (push_task_to_taskprocessor_singleton(p, t) < 0) {
ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
- release_task(t);
+ ast_task_free(t);
}
return RESULT_SUCCESS;
}
@@ -146,14 +283,12 @@
static void* default_taskprocessor_thread_function(void* data)
{
- struct taskprocessor_singleton_info* i = NULL;
- struct a_task* t = NULL;
- unsigned long size = 0;
- unsigned long task_count = 0;
- long qsize = 0;
+ struct taskprocessor_singleton_info* i;
+ struct a_task* t;
+ long size;
+ struct timeval tv;
int killflag = 0;
struct timespec ts = { 0, };
- struct timeval tv;
i = (struct taskprocessor_singleton_info*)data;
if (!i) {
@@ -166,12 +301,12 @@
/* stuff is in the queue */
t = pop_task_from_taskprocessor_singleton(i);
if (!t) {
- ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+ ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", size);
continue;
}
if (!t->execute) {
ast_log(LOG_ERROR, "task is missing its execute callback.\n");
- release_task(t);
+ ast_task_free(t);
continue;
}
if (t->execute(t) < 0) {
@@ -183,10 +318,9 @@
if (size > i->_stats->_max_qsize) {
i->_stats->_max_qsize = size;
}
- task_count = i->_stats->_tasks_processed_count;
}
ast_mutex_unlock(&i->_taskprocessor_lock);
- release_task(t);
+ ast_task_free(t);
t = NULL;
ast_mutex_lock(&_global_killflag_lock);
@@ -210,7 +344,7 @@
/* stuff is in the queue */
t = pop_task_from_taskprocessor_singleton(i);
if (t) {
- release_task(t);
+ ast_task_free(t);
t = NULL;
}
}
@@ -444,7 +578,7 @@
lock_failures++;
AST_LIST_UNLOCK(&tp->_queue);
usleep(1);
- if (lock_failures > 10000) {
+ if (lock_failures > 10) {
ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
return -1;
}
@@ -468,7 +602,7 @@
lock_failures++;
AST_LIST_UNLOCK(&tp->_queue);
usleep(1);
- if (lock_failures > 10000) {
+ if (lock_failures > 10) {
ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
return t;
}
@@ -493,7 +627,7 @@
lock_failures++;
AST_LIST_UNLOCK(&tp->_queue);
usleep(1);
- if (lock_failures > 10000) {
+ if (lock_failures > 10) {
ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
return size;
}
Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Fri Mar 14 22:11:10 2008
@@ -49,7 +49,6 @@
#include <asterisk/options.h>
#include <asterisk/config.h>
#include <asterisk/astobj2.h>
-#include <asterisk/task.h>
#include <asterisk/taskprocessor.h>
#include "sandbox/include/sandbox.h"
@@ -165,7 +164,7 @@
}
if (!t->execute) {
ast_log(LOG_ERROR, "task is missing its execute callback.\n");
- release_task(t);
+ ast_task_free(t);
continue;
}
if (t->execute(t) < 0) {
@@ -180,7 +179,7 @@
task_count = i->_stats->_tasks_processed_count;
}
ast_mutex_unlock(&i->_taskprocessor_lock);
- release_task(t);
+ ast_task_free(t);
t = NULL;
ast_mutex_lock(&_global_killflag_lock);
@@ -206,7 +205,7 @@
/* stuff is in the queue */
t = pop_task_from_taskprocessor_singleton(i);
if (t) {
- release_task(t);
+ ast_task_free(t);
t = NULL;
}
}
Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Fri Mar 14 22:11:10 2008
@@ -16,7 +16,6 @@
* at the top of the source tree.
*/
#include <asterisk.h>
-#include <asterisk/task.h>
#include <asterisk/taskprocessor.h>
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
@@ -125,7 +124,7 @@
struct a_task* t = NULL;
if (s && s->_producer) {
- t = get_available_task(simobject_taskhandler, 0, "start_simobject");
+ t = ast_task_alloc(simobject_taskhandler, 0, "start_simobject");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
return -1;
@@ -134,7 +133,7 @@
t->_p_consumer = s->_consumer;
if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task! Releasing task back to task pool\n", s->_name);
- release_task(t);
+ ast_task_free(t);
return -1;
}
ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
@@ -147,7 +146,7 @@
struct a_task* t = NULL;
if (s && s->_producer) {
- t = get_available_task(simobject_taskhandler, 0, "start_simproducer");
+ t = ast_task_alloc(simobject_taskhandler, 0, "start_simproducer");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
return -1;
@@ -230,7 +229,7 @@
ast_log(LOG_ERROR, "Huh? we have no private simobject pointer? This is bad!\n");
break;
}
- t = get_available_task(simobject_taskhandler, 0, "_simproducer_thread_function");
+ t = ast_task_alloc(simobject_taskhandler, 0, "_simproducer_thread_function");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
break;
@@ -242,7 +241,7 @@
} else {
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)! Releasing task to task pool\n"
, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
- release_task(t);
+ ast_task_free(t);
break;
}
ast_mutex_lock(&i->_taskprocessor_lock);
Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=108891&r1=108890&r2=108891
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Fri Mar 14 22:11:10 2008
@@ -22,7 +22,6 @@
* its event.
*/
#include <asterisk.h>
-#include <asterisk/task.h>
#include <asterisk/taskprocessor.h>
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
More information about the asterisk-commits
mailing list