[asterisk-commits] dhubbard: branch dhubbard/named_processors r109011 - in /team/dhubbard/named_...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sun Mar 16 22:51:09 CDT 2008
Author: dhubbard
Date: Sun Mar 16 22:51:08 2008
New Revision: 109011
URL: http://svn.digium.com/view/asterisk?view=rev&rev=109011
Log:
A lot of changes with interfaces, astobj2, code cleanup, and general stuff... res_testobserver, which is an unimportant sample client crashes when unloaded maybe due to simobject stuff which doesn't really matter either. I also want to send an eeep out to file in the land of canadians
Modified:
team/dhubbard/named_processors/apps/app_queue.c
team/dhubbard/named_processors/include/asterisk/taskprocessor.h
team/dhubbard/named_processors/main/taskprocessor.c
team/dhubbard/named_processors/res/res_testobserver.c
team/dhubbard/named_processors/res/sandbox/include/simobject.h
team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h
team/dhubbard/named_processors/res/sandbox/simobject.c
team/dhubbard/named_processors/res/sandbox/taskconsumer.c
Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Sun Mar 16 22:51:08 2008
@@ -6196,6 +6196,7 @@
clear_and_free_interfaces();
ao2_ref(queues, -1);
+ ao2_ref(tpsp, -1);
ao2_ref(tpsi, -1);
return res;
}
@@ -6242,11 +6243,8 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- if (!exists_taskprocessor_singleton("app_queue")) {
- create_taskprocessor_singleton("app_queue", 0);
- }
- tpsi = get_taskprocessor_singleton("app_queue");
- tpsp = construct_taskproducer(tpsi);
+ tpsi = ast_taskprocessor_alloc("app_queue", 0);
+ tpsp = ast_taskproducer_alloc(tpsi);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
res = -1;
Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Sun Mar 16 22:51:08 2008
@@ -78,16 +78,14 @@
int stop_taskpool(void);
int destroy_task_pool(void);
-int noop_task_execute(struct a_task* t);
+int noop_task_execute(struct a_task *t);
-int create_taskprocessor_singleton(const char* name, void* (*func)(void*));
+struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*));
int size_of_taskprocessor_singleton_list(void);
-struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name);
-int exists_taskprocessor_singleton(char *name);
int register_taskprocessor_clis(void);
int unregister_taskprocessor_clis(void);
-struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor);
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor);
#endif
Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Sun Mar 16 22:51:08 2008
@@ -47,7 +47,7 @@
static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
static void destroy_taskprocessor_singleton(void *tps);
-static int remove_taskprocessor_singleton(const char* name);
+static void destroy_taskproducer(void *tp);
static int taskprocessor_ping(struct a_task* e);
static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -373,7 +373,7 @@
}
-static struct taskprocessor_singleton_info* construct_default_taskprocessor(int poll_freq)
+static struct taskprocessor_singleton_info* tps_default_constructor(int poll_freq)
{
struct taskprocessor_singleton_info* tps;
tps = ao2_alloc(sizeof(*tps), destroy_taskprocessor_singleton);
@@ -391,16 +391,16 @@
tps->_stats = ast_calloc(1, sizeof(*tps->_stats));
if (!tps->_stats) {
ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_stats structure.\n");
- ast_free(tps);
+ ao2_ref(tps, -1);
return NULL;
}
return tps;
}
-int create_taskprocessor_singleton(const char* name, void* (*func)(void*))
+struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*))
{
int index;
- struct taskprocessor_singleton_info* p = NULL;
+ struct taskprocessor_singleton_info *p;
ast_mutex_lock(&_global_clireg_lock);
if (!_global_clireg) {
@@ -408,31 +408,47 @@
_global_clireg = 1;
}
ast_mutex_unlock(&_global_clireg_lock);
-
- p = construct_default_taskprocessor(1);
- if (!p) {
+
+ if (!name) {
+ ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
+ return NULL;
+ }
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+ if (!strcasecmp(p->_name, name)) {
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ao2_ref(p, 1);
+ ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' already exists!.\n", p->_name);
+ return p;
+ }
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ if ((p = tps_default_constructor(DEFAULT_POLL_FREQUENCY)) == NULL) {
ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
- return -1;
+ return NULL;
}
snprintf(p->_name, sizeof(p->_name), "%s", name);
p->_poll_thread_run = 1;
if (add_taskprocessor_singleton(p) < 0) {
ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
- return -1;
+ ao2_ref(p, -1);
+ return NULL;
}
index = size_of_taskprocessor_singleton_list();
- ast_log(LOG_DEBUG, "found taskprocessor %s at index %d\n", name, index);
+ ast_log(LOG_DEBUG, "creating taskprocessor \'%s\' at index %d\n", name, index);
pthread_attr_init(&_attribute[index]);
/* stay stopped if we are supposed to be stopped */
if (p->_poll_thread == AST_PTHREADT_STOP) {
ast_log(LOG_DEBUG, "poll thread == AST_PTHREADT_STOP.\n");
- return 0;
+ ao2_ref(p, -1);
+ return NULL;
}
ast_mutex_lock(&p->_taskprocessor_lock);
if (p->_poll_thread == pthread_self()) {
ast_mutex_unlock(&p->_taskprocessor_lock);
ast_log(LOG_DEBUG, "cannot kill myself.\n");
- return -1;
+ ao2_ref(p, -1);
+ return NULL;
}
if (p->_poll_thread != AST_PTHREADT_NULL) {
/* wake it up */
@@ -442,52 +458,47 @@
if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
ast_mutex_unlock(&p->_taskprocessor_lock);
ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
- return -1;
+ ao2_ref(p, -1);
+ return NULL;
}
}
pthread_attr_destroy(&_attribute[index]);
ast_mutex_unlock(&p->_taskprocessor_lock);
- return 0;
+ return p;
}
static void destroy_taskprocessor_singleton(void *tps)
{
+ struct taskprocessor_singleton_info *n;
struct taskprocessor_singleton_info *t = (struct taskprocessor_singleton_info *)tps;
+
+ ast_log(LOG_DEBUG, "destroying taskprocessor \'%s\'\n", t->_name);
if (!t) {
ast_log(LOG_ERROR, "can't destruct a NULL taskprocessor_singleton.\n");
return;
}
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+ if (n == t) {
+ AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
+ _taskprocessor_singletons_list_size -= 1;
+ //AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' removed.\n", t->_name);
+ break;
+ }
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
t->_poll_thread_run = 0;
ast_mutex_lock(&t->_taskprocessor_lock);
ast_cond_signal(&t->_poll_cond);
ast_mutex_unlock(&t->_taskprocessor_lock);
pthread_join(t->_poll_thread, NULL);
t->_poll_thread = AST_PTHREADT_NULL;
-
- if (remove_taskprocessor_singleton(t->_name) < 0) {
- ast_log(LOG_WARNING, "cannot remove taskprocessor_singleton \'%s\'.\n", t->_name);
- }
if (t->_stats) {
ast_free(t->_stats);
t->_stats = NULL;
}
return;
-}
-
-int exists_taskprocessor_singleton(char *name)
-{
- int found=0;
- struct taskprocessor_singleton_info* n = NULL;
-
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
- if (!strcasecmp(n->_name, name)) {
- found=1;
- break;
- }
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- return found;
}
static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
@@ -508,46 +519,6 @@
return 0;
}
-struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name)
-{
- struct taskprocessor_singleton_info* n;
-
- if (!name) {
- ast_log(LOG_WARNING, "requesting a nameless taskprocessor!!!\n");
- return NULL;
- }
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
- if (!strcasecmp(n->_name, name)) {
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' located.\n", n->_name);
- return n;
- }
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_log(LOG_WARNING, "could not find the taskprocessor named \'%s\'\n", name);
- return NULL;
-}
-
-static int remove_taskprocessor_singleton(const char* name)
-{
- struct taskprocessor_singleton_info* n = NULL;
-
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
- if (!strcasecmp(n->_name, name)) {
- AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
- _taskprocessor_singletons_list_size -= 1;
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_log(LOG_NOTICE, "taskprocessor_singleton \'%s\' removed.\n", name);
- return 0;
- }
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_log(LOG_WARNING, "did not find a taskprocessor_singleton \'%s\'\n", name);
- return -1;
-}
-
int size_of_taskprocessor_singleton_list(void)
{
int size;
@@ -557,30 +528,30 @@
return size;
}
-int ast_taskprocessor_push(struct taskprocessor_singleton_info* tp, struct a_task* t)
+int ast_taskprocessor_push(struct taskprocessor_singleton_info* tps, struct a_task* t)
{
int lock_failures = 0;
- if (!tp || !t) {
- ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tp, (long)t);
+ if (!tps || !t) {
+ ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tps, (long)t);
return -1;
}
- AST_LIST_LOCK(&tp->_queue);
- while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+ AST_LIST_LOCK(&tps->_queue);
+ while (ast_mutex_trylock(&tps->_taskprocessor_lock)) {
lock_failures++;
- AST_LIST_UNLOCK(&tp->_queue);
+ AST_LIST_UNLOCK(&tps->_queue);
usleep(1);
if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+ ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->_name);
return -1;
}
- AST_LIST_LOCK(&tp->_queue);
- }
- AST_LIST_INSERT_TAIL(&tp->_queue, t, list);
- tp->_queue_size+=1;
- ast_cond_signal(&tp->_poll_cond);
- ast_mutex_unlock(&tp->_taskprocessor_lock);
- AST_LIST_UNLOCK(&tp->_queue);
+ AST_LIST_LOCK(&tps->_queue);
+ }
+ AST_LIST_INSERT_TAIL(&tps->_queue, t, list);
+ tps->_queue_size+=1;
+ ast_cond_signal(&tps->_poll_cond);
+ ast_mutex_unlock(&tps->_taskprocessor_lock);
+ AST_LIST_UNLOCK(&tps->_queue);
return 0;
}
@@ -650,15 +621,25 @@
}
/* create and initialize a task producer */
-struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor)
{
struct taskproducer* p;
- p = ast_calloc(1, sizeof(*p));
+ p = ao2_alloc(sizeof(*p), destroy_taskproducer);
if (!p) {
ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
return NULL;
}
p->_taskprocessor = processor;
+ ao2_ref(processor, 1);
p->queue_task = default_queue_task;
return p;
}
+
+static void destroy_taskproducer(void *tp)
+{
+ struct taskproducer *p;
+ p = (struct taskproducer *)tp;
+ ast_log(LOG_DEBUG, "destroying taskproducer\n");
+ ao2_ref(p->_taskprocessor, -1);
+}
+
Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Sun Mar 16 22:51:08 2008
@@ -64,6 +64,7 @@
#define MAX_TEST_PROCESSORS 4
#define MAX_TEST_PRODUCERS 20
struct simobject* simprod[MAX_TEST_PRODUCERS];
+struct taskprocessor_singleton_info *simprocessors[MAX_TEST_PROCESSORS];
AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
static int _global_killflag = 0;
@@ -191,7 +192,6 @@
tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
- ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout), loops: %ld\n", i->_name, loop_count);
ast_mutex_lock(&i->_taskprocessor_lock);
ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
ast_mutex_unlock(&i->_taskprocessor_lock);
@@ -216,21 +216,23 @@
static int unload_module(void)
{
int res, count;
- char tbuf[128];
- struct simobject* s = NULL;
+ struct simobject *s;
ast_log(LOG_NOTICE, "-- testobserver -- unloading.\n");
+ ast_log(LOG_NOTICE, "STOPPING %d PRODUCERS\n", MAX_TEST_PRODUCERS);
for (count=0; count < MAX_TEST_PRODUCERS; count++) {
- s = (struct simobject *)simprod[count];
- if ( s ) {
- s->stop(s);
- }
- }
+ s = simprod[count];
+ if (s && s->_producer) {
+ ao2_ref(s->_producer, -1);
+ }
+ if (s && s->_consumer) {
+ ao2_ref(s->_consumer, -1);
+ }
+ ao2_ref(s, -1);
+ }
+ ast_log(LOG_NOTICE, "STOPPING %d PROCESSORS\n", MAX_TEST_PROCESSORS);
for (count=0; count < MAX_TEST_PROCESSORS; count++) {
- snprintf(tbuf, sizeof(tbuf), "sillyprocessor-%d", count);
- if (exists_taskprocessor_singleton(tbuf)) {
- ao2_ref(get_taskprocessor_singleton(tbuf), -1);
- }
+ ao2_ref(simprocessors[count], -1);
}
ast_log(LOG_NOTICE, "-- testobserver -- unload complete.\n");
usleep(100);
@@ -243,33 +245,30 @@
{
int count;
char tbuf[128], tproc[128];
- struct simobject* s = NULL;
-
+ struct simobject* s;
+ struct taskprocessor_singleton_info *t;
+
+ ast_log(LOG_NOTICE, "CREATING %d PROCESSORS\n", MAX_TEST_PROCESSORS);
for (count=0; count < MAX_TEST_PROCESSORS; count++) {
snprintf(tbuf, sizeof(tbuf), "sillyprocessor-%d", count);
ast_log(LOG_DEBUG, "%s\n", tbuf);
- if (!exists_taskprocessor_singleton(tbuf)) {
- if (create_taskprocessor_singleton(tbuf, _evtq_poll_thread_function) < 0) {
- ast_log(LOG_ERROR, "can't create \'%s\' taskprocessor singleton\n", tbuf);
- return AST_MODULE_LOAD_DECLINE;
- }
- }
- }
+ t = ast_taskprocessor_alloc(tbuf, 0);
+ simprocessors[count] = t;
+ }
+ usleep(100);
+ ast_log(LOG_NOTICE, "CREATING %d PRODUCERS\n", MAX_TEST_PRODUCERS);
for (count=0; count < MAX_TEST_PRODUCERS; count++) {
snprintf(tbuf, sizeof(tbuf), "producer-%d", count);
snprintf(tproc, sizeof(tproc), "sillyprocessor-%d", count%MAX_TEST_PROCESSORS);
- ast_log(LOG_DEBUG, "%s will use %s\n", tbuf, tproc);
- simprod[count] = construct_simproducer(tbuf, tproc);
- }
-
- ast_log(LOG_NOTICE, "started %d processors, task pool size: %d\n"
- , size_of_taskprocessor_singleton_list(), ast_task_poolsize());
+ simprod[count] = (struct simobject *)ast_simproducer_alloc(tbuf, tproc);
+ }
+ ast_log(LOG_NOTICE, "started %d processors, task pool size: %d\n", size_of_taskprocessor_singleton_list(), ast_task_poolsize());
if (ast_register_application(app, app_exec, synopsis, descrip))
return AST_MODULE_LOAD_DECLINE;
for (count=0; count < MAX_TEST_PRODUCERS; count++) {
- s = simprod[count];
+ s = (struct simobject *)simprod[count];
if (s && s->start)
s->start(s);
}
Modified: team/dhubbard/named_processors/res/sandbox/include/simobject.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/simobject.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/simobject.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/simobject.h Sun Mar 16 22:51:08 2008
@@ -31,11 +31,10 @@
struct taskprocessor_singleton_info *_taskgenerator;
int (*start)(struct simobject *sim);
- int (*stop)(struct simobject *sim);
};
-struct simobject* construct_simobject(const char *name, const char *named_taskprocessor);
-struct simobject* construct_simproducer(const char *name, const char *named_taskprocessor);
-struct simobject* construct_simconsumer(const char *name, const char *named_taskprocessor);
+struct simobject* ast_simobject_alloc(const char *name, const char *named_taskprocessor);
+struct simobject* ast_simproducer_alloc(const char *name, const char *named_taskprocessor);
+struct simobject* ast_simconsumer_alloc(const char *name, const char *named_taskprocessor);
#endif
Modified: team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h Sun Mar 16 22:51:08 2008
@@ -22,12 +22,10 @@
#define __taskconsumer_h__
struct taskconsumer {
- void* _owner;
struct taskprocessor_singleton_info* _taskprocessor;
-
int (* handle_task)(struct taskconsumer* consumer, struct a_task* task);
};
-struct taskconsumer* construct_taskconsumer(void* owner, struct taskprocessor_singleton_info* processor);
+struct taskconsumer* ast_taskconsumer_alloc(struct taskprocessor_singleton_info* processor);
#endif
Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Sun Mar 16 22:51:08 2008
@@ -16,6 +16,7 @@
* at the top of the source tree.
*/
#include <asterisk.h>
+#include <asterisk/astobj2.h>
#include <asterisk/taskprocessor.h>
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
@@ -23,99 +24,105 @@
static int start_simobject(struct simobject* s);
static int start_simproducer(struct simobject* s);
-static int stop_simobject(struct simobject* s);
static int simobject_taskhandler(struct a_task* t);
static void* _simproducer_thread_function(void* data);
-
-struct simobject* construct_simobject(const char* name, const char* named_taskprocessor)
-{
- struct simobject* s = NULL;
- struct taskprocessor_singleton_info* t = NULL;
-
- t = get_taskprocessor_singleton(named_taskprocessor);
+static void destroy_simobject(void *sim);
+
+struct simobject* ast_simobject_alloc(const char* name, const char* named_taskprocessor)
+{
+ struct simobject *s;
+ struct taskprocessor_singleton_info *t;
+
+ t = ast_taskprocessor_alloc(named_taskprocessor, 0);
if (!t) {
- ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for simobject \'%s\' construction.\n"
- , named_taskprocessor, name);
- return NULL;
- }
- s = ast_calloc(1, sizeof(*s));
+ ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for simobject \'%s\' construction.\n", named_taskprocessor, name);
+ return NULL;
+ }
+ s = ao2_alloc(sizeof(*s), destroy_simobject);
if (!s) {
- ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+ ast_log(LOG_ERROR, "Failed to allocate simobject \'%s\'\n", name);
return NULL;
}
snprintf(s->_name, sizeof(s->_name), "%s", name);
- s->_producer = construct_taskproducer(t);
- s->_consumer = construct_taskconsumer(s, t);
+ s->_producer = ast_taskproducer_alloc(t);
+ s->_consumer = ast_taskconsumer_alloc(t);
s->start = start_simobject;
- s->stop = stop_simobject;
- ast_log(LOG_DEBUG, "created simobject \'%s\' at 0x%ld using taskprocessor \'%s\'.\n", name, (unsigned long)s, named_taskprocessor);
return s;
}
-struct simobject* construct_simproducer(const char* name, const char* named_taskprocessor)
-{
- char tpg[256]; /*generator name, there is probably a definition for this size */
- struct simobject* s = NULL;
- struct taskprocessor_singleton_info* t = NULL;
-
- t = get_taskprocessor_singleton(named_taskprocessor);
+struct simobject* ast_simproducer_alloc(const char* name, const char* named_taskprocessor)
+{
+ char tpg[256];
+ struct simobject *s;
+ struct taskprocessor_singleton_info *t;
+
+ t = ast_taskprocessor_alloc(named_taskprocessor, 0);
if (!t) {
- ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for sim-producer \'%s\' construction.\n"
- , named_taskprocessor, name);
- return NULL;
- }
- s = ast_calloc(1, sizeof(*s));
+ ast_log(LOG_ERROR, "could not allocate taskprocessor \'%s\' for sim-producer \'%s\' construction.\n", named_taskprocessor, name);
+ return NULL;
+ }
+ s = ao2_alloc(sizeof(*s), destroy_simobject);
if (!s) {
- ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+ ast_log(LOG_ERROR, "Failed to allocate simproducer \'%s\'\n", name);
return NULL;
}
snprintf(s->_name, sizeof(s->_name), "%s", name);
snprintf(tpg, sizeof(tpg), "%s-generator", s->_name);
- s->_producer = construct_taskproducer(t);
- s->_consumer = construct_taskconsumer(s, t);
- if (!exists_taskprocessor_singleton(tpg)) {
- if (create_taskprocessor_singleton(tpg, _simproducer_thread_function) < 0) {
- ast_log(LOG_ERROR, "can't create \'%s\' taskprocessor singleton\n", tpg);
- ast_free(s);
- return NULL;
- }
- }
- s->_taskgenerator = get_taskprocessor_singleton(tpg);
+ s->_producer = ast_taskproducer_alloc(t);
+ s->_consumer = ast_taskconsumer_alloc(t);
+ s->_taskgenerator = ast_taskprocessor_alloc(tpg, _simproducer_thread_function);
if (!s->_taskgenerator) {
ast_log(LOG_ERROR, "Huh? we don't have a task generator called \'%s\' ??\n", tpg);
- ast_free(s);
- return NULL;
- }
- ast_log(LOG_DEBUG, "simobject at 0x%ld (taskgenerator: 0x%ld) setting private to 0x%ld\n", (unsigned long)&s, (unsigned long)s->_taskgenerator, (unsigned long)&s);
+ ao2_ref(s, -1);
+ return NULL;
+ }
s->_taskgenerator->_private = s;
s->start = start_simproducer;
- s->stop = stop_simobject;
return s;
}
-struct simobject* construct_simconsumer(const char* name, const char* named_taskprocessor)
-{
- struct simobject* s = NULL;
- struct taskprocessor_singleton_info* t = NULL;
-
- t = get_taskprocessor_singleton(named_taskprocessor);
+struct simobject* ast_simconsumer_alloc(const char* name, const char* named_taskprocessor)
+{
+ struct simobject* s;
+ struct taskprocessor_singleton_info* t;
+
+ t = ast_taskprocessor_alloc(named_taskprocessor, 0);
if (!t) {
- ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for sim-consumer \'%s\' construction.\n"
- , named_taskprocessor, name);
- return NULL;
- }
- s = ast_calloc(1, sizeof(*s));
+ ast_log(LOG_ERROR, "could not allocate taskprocessor \'%s\' for sim-producer \'%s\' construction.\n", named_taskprocessor, name);
+ return NULL;
+ }
+ s = ao2_alloc(sizeof(*s), destroy_simobject);
if (!s) {
- ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+ ast_log(LOG_ERROR, "Failed to allocate simconsumer \'%s\'\n", name);
return NULL;
}
snprintf(s->_name, sizeof(s->_name), "%s", name);
- s->_consumer = construct_taskconsumer(s, t);
+ s->_consumer = ast_taskconsumer_alloc(t);
s->start = start_simobject;
- s->stop = stop_simobject;
- ast_log(LOG_DEBUG, "created simobject \'%s\' with task production capabilities, using taskprocessor \'%s\'.\n", name, named_taskprocessor);
return s;
+}
+
+static void destroy_simobject(void *sim)
+{
+ struct simobject *s = (struct simobject *)sim;
+ if (s && s->_taskgenerator) {
+ ast_log(LOG_DEBUG, "simobject \'%s\' stopping\n", s->_name);
+ s->_taskgenerator->_poll_thread_run = 0;
+ ast_mutex_lock(&s->_taskgenerator->_taskprocessor_lock);
+ ast_cond_signal(&s->_taskgenerator->_poll_cond);
+ ast_mutex_unlock(&s->_taskgenerator->_taskprocessor_lock);
+ pthread_join(s->_taskgenerator->_poll_thread, NULL);
+ s->_taskgenerator->_poll_thread = AST_PTHREADT_NULL;
+ }
+ if (s->_producer) {
+ ao2_ref(s->_producer, -1);
+ }
+ if (s->_consumer) {
+ ao2_ref(s->_consumer, -1);
+ }
+ ast_log(LOG_DEBUG, "simobject destroyed!\n");
+ return;
}
@@ -136,7 +143,6 @@
ast_task_free(t);
return -1;
}
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
}
return 0;
}
@@ -157,20 +163,9 @@
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!\n", s->_name);
return -1;
}
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
}
return 0;
}
-
-int stop_simobject(struct simobject* s)
-{
- if (s && s->_taskgenerator) {
- ast_log(LOG_DEBUG, "simobject \'%s\' stopping\n", s->_name);
- s->_taskgenerator->_poll_thread_run = 0;
- }
- return 0;
-}
-
static int simobject_taskhandler(struct a_task* e)
{
@@ -209,7 +204,6 @@
ast_log(LOG_DEBUG, "waiting to start \'%s\'\n", i->_name);
} while (!i->_poll_thread_run);
-
ast_mutex_lock(&i->_taskprocessor_lock);
killflag = !i->_poll_thread_run;
freq = i->_poll_freq;
@@ -224,7 +218,6 @@
*
* start by sleeping for the poll frequency, then produce a task
*/
- usleep((rand()%10)+freq);
if (!s) {
ast_log(LOG_ERROR, "Huh? we have no private simobject pointer? This is bad!\n");
break;
@@ -236,19 +229,22 @@
}
t->_p_producer = s->_producer;
t->_p_consumer = s->_consumer;
- if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) == 0)) {
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_consumer->_taskprocessor->_name);
- } else {
+ if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t))) {
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)! Releasing task to task pool\n"
, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
ast_task_free(t);
break;
}
ast_mutex_lock(&i->_taskprocessor_lock);
+ ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+ ast_mutex_unlock(&i->_taskprocessor_lock);
+ usleep(1);
+ ast_mutex_lock(&i->_taskprocessor_lock);
killflag = !i->_poll_thread_run;
ast_mutex_unlock(&i->_taskprocessor_lock);
}
i->_is_purged = 1;
+ ao2_ref(i, -1);
return NULL;
}
Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Sun Mar 16 22:51:08 2008
@@ -22,35 +22,40 @@
* its event.
*/
#include <asterisk.h>
+#include <asterisk/astobj2.h>
#include <asterisk/taskprocessor.h>
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "include/taskconsumer.h"
+static void destroy_taskconsumer(void *tc);
/* the default handler function is a failure */
static int invalid_handle_task(struct taskconsumer* consumer, struct a_task* task)
{
- ast_log(LOG_ERROR, "handle_task() not overidden. consumer: 0x%ld, tasksource: \'%s\'\n"
- , (unsigned long)consumer
- , (task)?task->_source:"unknown");
+ ast_log(LOG_ERROR, "handle_task() not overidden. consumer: 0x%ld, tasksource: \'%s\'\n", (unsigned long)consumer, (task)?task->_source:"unknown");
return -1;
}
/* create and initialize the default consumer object */
-struct taskconsumer* construct_taskconsumer(void* owner, struct taskprocessor_singleton_info* processor)
+struct taskconsumer* ast_taskconsumer_alloc(struct taskprocessor_singleton_info* processor)
{
struct taskconsumer* c = NULL;
- c = ast_calloc(1, sizeof(*c));
+ c = ao2_alloc(sizeof(*c), destroy_taskconsumer);
if (!c) {
- ast_log(LOG_ERROR, "cannot allocate memory for a taskconsumer structure.\n");
+ ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
return NULL;
}
- c->_owner = owner;
c->_taskprocessor = processor;
c->handle_task = invalid_handle_task;
- ast_log(LOG_DEBUG, "created default_taskconsumer for owner: 0x%ld using taskprocessor \'%s\'\n"
- , (unsigned long)owner, (processor)?processor->_name:"<null>");
+ ast_log(LOG_DEBUG, "created taskconsumer using taskprocessor \'%s\'\n", (processor)?processor->_name:"<null>");
return c;
}
+static void destroy_taskconsumer(void *tc)
+{
+ struct taskconsumer *c;
+ ast_log(LOG_DEBUG, "destroying taskconsumer\n");
+ c = (struct taskconsumer *)tc;
+ ao2_ref(c->_taskprocessor, -1);
+}
More information about the asterisk-commits
mailing list