[svn-commits] dhubbard: branch dhubbard/named_processors r107992 - in /team/dhubbard/named_...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Wed Mar 12 01:40:40 CDT 2008
Author: dhubbard
Date: Wed Mar 12 01:40:40 2008
New Revision: 107992
URL: http://svn.digium.com/view/asterisk?view=rev&rev=107992
Log:
This is the initial proof of concept integration of named taskprocessors into app_queue.
I did this with Mark Michelson and a lot of changes were made. A lot more changes are coming.
Modified:
team/dhubbard/named_processors/apps/app_queue.c
team/dhubbard/named_processors/res/res_testobserver.c
team/dhubbard/named_processors/res/sandbox/include/task.h
team/dhubbard/named_processors/res/sandbox/include/taskproducer.h
team/dhubbard/named_processors/res/sandbox/simobject.c
team/dhubbard/named_processors/res/sandbox/task.c
team/dhubbard/named_processors/res/sandbox/taskconsumer.c
team/dhubbard/named_processors/res/sandbox/taskprocessor.c
team/dhubbard/named_processors/res/sandbox/taskproducer.c
Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Wed Mar 12 01:40:40 2008
@@ -92,6 +92,9 @@
#include "asterisk/astobj2.h"
#include "asterisk/strings.h"
#include "asterisk/global_datastores.h"
+#include "asterisk/task.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/taskproducer.h"
/*!
* \par Please read before modifying this file.
@@ -130,6 +133,9 @@
{ QUEUE_STRATEGY_LINEAR, "linear" },
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
};
+
+struct taskproducer *tpsp;
+struct taskprocessor_singleton_info *tpsi;
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
@@ -674,7 +680,7 @@
* Lock interface list find sc, iterate through each queues queue_member list for member to
* update state inside queues
*/
-static void *handle_statechange(struct statechange *sc)
+static int handle_statechange(struct a_task* task)
{
struct call_queue *q;
struct member *cur;
@@ -683,13 +689,16 @@
struct ao2_iterator queue_iter;
char *loc;
char *technology;
+ struct statechange *sc = task->_datap;
+
+ ast_log(LOG_NOTICE, "device %s received state change %s\n", sc->dev, devstate2str(sc->state));
technology = ast_strdupa(sc->dev);
loc = strchr(technology, '/');
if (loc) {
*loc++ = '\0';
} else {
- return NULL;
+ return 0;
}
AST_LIST_LOCK(&interfaces);
@@ -708,7 +717,7 @@
if (!curint) {
ast_debug(3, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state));
- return NULL;
+ return 0;
}
ast_debug(1, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
@@ -755,7 +764,7 @@
ao2_unlock(q);
}
- return NULL;
+ return 0;
}
/*! \brief Data used by the device state thread */
@@ -774,64 +783,13 @@
.thread = AST_PTHREADT_NULL,
};
-/*! \brief Consumer of the statechange queue */
-static void *device_state_thread(void *data)
-{
- struct statechange *sc = NULL;
-
- while (!device_state.stop) {
- ast_mutex_lock(&device_state.lock);
- if (!(sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry))) {
- ast_cond_wait(&device_state.cond, &device_state.lock);
- sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry);
- }
- ast_mutex_unlock(&device_state.lock);
-
- /* Check to see if we were woken up to see the request to stop */
- if (device_state.stop)
- break;
-
- if (!sc)
- continue;
-
- handle_statechange(sc);
-
- ast_free(sc);
- sc = NULL;
- }
-
- if (sc)
- ast_free(sc);
-
- while ((sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry)))
- ast_free(sc);
-
- return NULL;
-}
-
-/*! \brief Producer of the statechange queue */
-static int statechange_queue(const char *dev, enum ast_device_state state)
-{
- struct statechange *sc;
-
- if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(dev) + 1)))
- return 0;
-
- sc->state = state;
- strcpy(sc->dev, dev);
-
- ast_mutex_lock(&device_state.lock);
- AST_LIST_INSERT_TAIL(&device_state.state_change_q, sc, entry);
- ast_cond_signal(&device_state.cond);
- ast_mutex_unlock(&device_state.lock);
-
- return 0;
-}
-
static void device_state_cb(const struct ast_event *event, void *unused)
{
enum ast_device_state state;
const char *device;
+ struct statechange *sc;
+ struct a_task* t;
+ size_t datapsize;
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -840,8 +798,20 @@
ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
return;
}
-
- statechange_queue(device, state);
+ datapsize = sizeof(*sc) + strlen(device) + 1;
+ if (!(sc = ast_calloc(1, datapsize))) {
+ ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
+ return;
+ }
+ sc->state = state;
+ strcpy(sc->dev, device);
+ t = get_available_task(handle_statechange, sc, datapsize, "app_queue-device_state_cb");
+ if (tpsp->queue_task(tpsp, t) < 0) {
+ ast_log(LOG_WARNING, "queue_task failed!!\n");
+ release_task(t);
+ }
+ ast_log(LOG_NOTICE, "queued statechange for %s\n", device);
+ ast_free(sc); /* maybe alloca ?? */
}
/*! \brief allocate space for new queue member and set fields based on parameters passed */
@@ -6276,7 +6246,6 @@
ast_mutex_init(&device_state.lock);
ast_cond_init(&device_state.cond, NULL);
- ast_pthread_create(&device_state.thread, NULL, device_state_thread, NULL);
ast_cli_register_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
res = ast_register_application(app, queue_exec, synopsis, descrip);
@@ -6300,6 +6269,13 @@
res |= ast_custom_function_register(&queuememberlist_function);
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
+
+ if (!exists_taskprocessor_singleton("app_queue")) {
+ create_taskprocessor_singleton("app_queue", 0);
+ }
+ tpsi = get_taskprocessor_singleton("app_queue");
+ tpsp = construct_taskproducer(tpsi);
+
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
res = -1;
Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Wed Mar 12 01:40:40 2008
@@ -39,15 +39,16 @@
#include <unistd.h>
#include <string.h>
-#include "asterisk/file.h"
-#include "asterisk/logger.h"
-#include "asterisk/channel.h"
-#include "asterisk/pbx.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
-#include "asterisk/app.h"
-#include "asterisk/options.h"
-#include "asterisk/config.h"
+#include <asterisk/file.h>
+#include <asterisk/logger.h>
+#include <asterisk/channel.h>
+#include <asterisk/pbx.h>
+#include <asterisk/module.h>
+#include <asterisk/lock.h>
+#include <asterisk/app.h>
+#include <asterisk/options.h>
+#include <asterisk/config.h>
+#include <asterisk/astobj2.h>
#include "sandbox/include/sandbox.h"
#include "sandbox/include/configuration.h"
Modified: team/dhubbard/named_processors/res/sandbox/include/task.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/task.h?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/task.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/task.h Wed Mar 12 01:40:40 2008
@@ -27,14 +27,13 @@
#define __task_h__
struct a_task {
- unsigned long _id;
- char _source[256];
+ int (* execute)(struct a_task* t);
+ void *_datap;
+ size_t _datapsize;
void* _p_producer;
void* _p_consumer;
-
- int (* execute)(struct a_task* t);
-
time_t _timestamp;
+ char _source[256];
AST_LIST_ENTRY(a_task) list;
};
@@ -42,8 +41,7 @@
* get an available a_task structure from the pool of available
* structures. If no structures are available, allocate one
*/
-struct a_task* get_available_task(unsigned long id, char* src);
-struct a_task* get_available_task_with_handler(unsigned long id, char* src, int (*f)(struct a_task* t));
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, size_t size, char* src);
int task_pool_size(void);
/*
* release a_task structure back to the pool
@@ -53,7 +51,6 @@
int stop_taskpool(void);
int destroy_task_pool(void);
-int invalid_task_execute(struct a_task* t);
int noop_task_execute(struct a_task* t);
#endif
Modified: team/dhubbard/named_processors/res/sandbox/include/taskproducer.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/taskproducer.h?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/taskproducer.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/taskproducer.h Wed Mar 12 01:40:40 2008
@@ -23,13 +23,12 @@
#define __taskproducer_h__
struct taskproducer {
- void* _owner;
struct taskprocessor_singleton_info* _taskprocessor;
unsigned long _tasks_produced;
int (* queue_task)(struct taskproducer* producer, struct a_task* task);
};
-struct taskproducer* construct_taskproducer(void* owner, struct taskprocessor_singleton_info* processor);
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor);
#endif
Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Wed Mar 12 01:40:40 2008
@@ -42,7 +42,7 @@
}
memset(s, 0, sizeof(struct simobject));
snprintf(s->_name, sizeof(s->_name), "%s", name);
- s->_producer = construct_taskproducer(s, t);
+ s->_producer = construct_taskproducer(t);
s->_consumer = construct_taskconsumer(s, t);
s->start = start_simobject;
s->stop = stop_simobject;
@@ -70,7 +70,7 @@
memset(s, 0, sizeof(struct simobject));
snprintf(s->_name, sizeof(s->_name), "%s", name);
snprintf(tpg, sizeof(tpg), "%s-generator", s->_name);
- s->_producer = construct_taskproducer(s, t);
+ s->_producer = construct_taskproducer(t);
s->_consumer = construct_taskconsumer(s, t);
if (!exists_taskprocessor_singleton(tpg)) {
if (create_taskprocessor_singleton(tpg, _simproducer_thread_function) < 0) {
@@ -124,20 +124,19 @@
struct a_task* t = NULL;
if (s && s->_producer) {
- t = get_available_task(SIM_PRODUCER_EVENT, "start_simobject");
+ t = get_available_task(simobject_taskhandler, 0, 0, "start_simobject");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
return -1;
}
t->_p_producer = s->_producer;
t->_p_consumer = s->_consumer;
- t->execute = simobject_taskhandler;
if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task! Releasing task back to task pool\n", s->_name);
release_task(t);
return -1;
}
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_producer->_taskprocessor->_name);
+ ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
}
return 0;
}
@@ -147,19 +146,18 @@
struct a_task* t = NULL;
if (s && s->_producer) {
- t = get_available_task(SIM_PRODUCER_SLEEP, "start_simproducer");
+ t = get_available_task(simobject_taskhandler, 0, 0, "start_simproducer");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
return -1;
}
t->_p_producer = s->_producer;
t->_p_consumer = s->_consumer;
- t->execute = simobject_taskhandler;
if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!\n", s->_name);
return -1;
}
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_producer->_taskprocessor->_name);
+ ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
}
return 0;
}
@@ -176,26 +174,11 @@
static int simobject_taskhandler(struct a_task* e)
{
- int rc = 0;
if (!e) {
ast_log(LOG_ERROR, "Huh? No event!!\n");
return -1;
}
- switch (e->_id) {
- case SIM_PRODUCER_EVENT:
- ast_log(LOG_DEBUG, "[SIM_PRODUCER_EVENT] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
- , e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
- break;
- case SIM_PRODUCER_SLEEP:
- ast_log(LOG_DEBUG, "[SIM_PRODUCER_SLEEP] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
- , e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
- break;
- default:
- ast_log(LOG_WARNING, "[UNHANDLED TASK: 0x%ld] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
- , (unsigned long)e->_id, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
- rc = -1;
- }
- return rc;
+ return 0;
}
void* _simproducer_thread_function(void* data)
@@ -203,9 +186,6 @@
struct taskprocessor_singleton_info* i = NULL;
struct simobject* s = NULL;
struct a_task* t = NULL;
- //unsigned long max_qsize = 0;
- //unsigned long task_count = 0;
- //long qsize = 0;
int killflag = 0;
int freq = 1;
struct timespec ts = { 0, };
@@ -235,7 +215,6 @@
freq = i->_poll_freq;
s = (struct simobject*)i->_private;
ast_mutex_unlock(&i->_taskprocessor_lock);
- //ast_log(LOG_NOTICE, "producer: 0x%ld, consumer: 0x%ld, taskgenerator: 0x%ld\n", (unsigned long)s->_producer, (unsigned long)s->_consumer, (unsigned long)s->_taskgenerator);
while (!killflag) {
@@ -245,25 +224,23 @@
*
* start by sleeping for the poll frequency, then produce a task
*/
- //usleep((rand()%10)+freq);
- usleep(1000);
+ usleep((rand()%10)+freq);
if (!s) {
ast_log(LOG_ERROR, "Huh? we have no private simobject pointer? This is bad!\n");
break;
}
- t = get_available_task(SIM_PRODUCER_SLEEP, "_simproducer_thread_function");
+ t = get_available_task(simobject_taskhandler, 0, 0, "_simproducer_thread_function");
if (!t) {
ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
break;
}
t->_p_producer = s->_producer;
t->_p_consumer = s->_consumer;
- t->execute = simobject_taskhandler;
if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) == 0)) {
- ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_consumer->_taskprocessor->_name);
+ ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_consumer->_taskprocessor->_name);
} else {
- ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task 0x%ld (producer: 0x%ld, generator: 0x%ld)! Releasing task to task pool\n"
- , s->_name, (unsigned long)t->_id, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
+ ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)! Releasing task to task pool\n"
+ , s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
release_task(t);
break;
}
Modified: team/dhubbard/named_processors/res/sandbox/task.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/task.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/task.c (original)
+++ team/dhubbard/named_processors/res/sandbox/task.c Wed Mar 12 01:40:40 2008
@@ -22,17 +22,9 @@
AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
static int _global_kill_taskpool = 0;
-static struct a_task* __get_available_task(unsigned long id, char* src, int (*f)(struct a_task* t));
-
-int invalid_task_execute(struct a_task* t)
-{
- ast_log(LOG_ERROR, "Unable to execute task \'%d\' for \'%s\' - No execute() handler available\n", (t)?(int)t->_id:-1, (t)?t->_source:"<task-structure-missing>");
- return -1;
-}
-
int noop_task_execute(struct a_task* t)
{
- ast_log(LOG_DEBUG, "noop (task \'%d\') for \'%s\'\n", (t)?(int)t->_id:-1, (t)?t->_source:"<task-structure-missing>");
+ ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
return 0;
}
@@ -40,28 +32,22 @@
* get an available a_task structure from the pool of available
* structures. If no structures are available, allocate one
*/
-struct a_task* get_available_task(unsigned long id, char* src)
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, size_t size, char* src)
{
- return __get_available_task(id, src, 0);
-}
+ struct a_task *t;
+ void *tdata;
+ int lock_failures = 0;
-
-struct a_task* get_available_task_with_handler(unsigned long id, char* src, int (*f)(struct a_task* t))
-{
- return __get_available_task(id, src, f);
-}
-
-static struct a_task* __get_available_task(unsigned long id, char* src, int (*f)(struct a_task* t))
-{
- struct a_task* t = NULL;
- int lock_failures = 0;
-
+ tdata = ast_calloc(1, size);
+ if (tdata) {
+ memcpy(tdata, datap, size);
+ }
AST_LIST_LOCK(&_task_pool);
while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
lock_failures++;
AST_LIST_UNLOCK(&_task_pool);
usleep(1);
- if (lock_failures > 10000) {
+ if (lock_failures > 10) {
ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
return NULL;
}
@@ -75,17 +61,17 @@
return NULL;
}
if (AST_LIST_EMPTY(&_task_pool)) {
- t = (void*)malloc(sizeof(struct a_task));
- if (t) {
- memset(t, 0, sizeof(struct a_task));
- if (f == 0) {
- t->execute = &invalid_task_execute;
- } else {
- t->execute = f;
- }
- t->_id = id;
- strncpy(t->_source, src, sizeof(t->_source));
+ t = ast_calloc(1,sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "Poop. ast_calloc failed to get a task for \'%s\'\n", src);
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return NULL;
}
+ t->execute = task_exe;
+ t->_datap = tdata;
+ t->_datapsize = size;
+ strncpy(t->_source, src, sizeof(t->_source));
ast_mutex_unlock(&_global_kill_taskpool_lock);
AST_LIST_UNLOCK(&_task_pool);
return t;
@@ -95,12 +81,9 @@
ast_mutex_unlock(&_global_kill_taskpool_lock);
AST_LIST_UNLOCK(&_task_pool);
if (t) {
- if (f == 0) {
- t->execute = &invalid_task_execute;
- } else {
- t->execute = f;
- }
- t->_id = id;
+ t->execute = task_exe;
+ t->_datap = tdata;
+ t->_datapsize = size;
strncpy(t->_source, src, sizeof(t->_source));
}
return t;
@@ -132,6 +115,9 @@
int release_task(struct a_task* t)
{
int lock_failures = 0;
+ if (t->_datap) {
+ free(t->_datap);
+ }
memset(t, 0, sizeof(struct a_task));
AST_LIST_LOCK(&_task_pool);
while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Wed Mar 12 01:40:40 2008
@@ -28,9 +28,8 @@
/* the default handler function is a failure */
static int invalid_handle_task(struct taskconsumer* consumer, struct a_task* task)
{
- ast_log(LOG_ERROR, "handle_task() not overidden. consumer: 0x%ld, task: 0x%ld, tasksource: \'%s\'\n"
+ ast_log(LOG_ERROR, "handle_task() not overidden. consumer: 0x%ld, tasksource: \'%s\'\n"
, (unsigned long)consumer
- , (task)?(unsigned long)task->_id:0
, (task)?task->_source:"unknown");
return -1;
}
Modified: team/dhubbard/named_processors/res/sandbox/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskprocessor.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskprocessor.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskprocessor.c Wed Mar 12 01:40:40 2008
@@ -26,13 +26,17 @@
#include "include/taskdefinitions.h"
#define DEFAULT_POLL_FREQUENCY 1
+
+AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
+static int _global_killflag = 0;
+
pthread_attr_t _attribute[100];
AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
static int _taskprocessor_singletons_list_size = 0;
static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
static int remove_taskprocessor_singleton(const char* name);
-static int taskprocessor_taskhandler(struct a_task* e);
+static int taskprocessor_ping(struct a_task* e);
/*********
* CLI
@@ -81,12 +85,11 @@
return CLI_SUCCESS;
}
- t = get_available_task(TASKPROCESSOR_CLI_PING, "cli_taskprocessor_ping");
+ t = get_available_task(taskprocessor_ping, 0, 0, "cli_taskprocessor_ping");
if (!t) {
ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
return RESULT_SUCCESS;
}
- t->execute = taskprocessor_taskhandler;
if (push_task_to_taskprocessor_singleton(p, t) < 0) {
ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
release_task(t);
@@ -134,24 +137,89 @@
return RESULT_SUCCESS;
}
-static int taskprocessor_taskhandler(struct a_task* e)
-{
- int rc = 0;
-
+static void* default_taskprocessor_thread_function(void* data)
+{
+ struct taskprocessor_singleton_info* i = NULL;
+ struct a_task* t = NULL;
+ unsigned long size = 0;
+ unsigned long task_count = 0;
+ long qsize = 0;
+ int killflag = 0;
+ struct timespec ts = { 0, };
+ struct timeval tv;
+
+ i = (struct taskprocessor_singleton_info*)data;
+ if (!i) {
+ ast_log(LOG_ERROR, "cannot start thread_function loop without a taskprocessor_singleton_info structure.\n");
+ return NULL;
+ }
+
+ while ((!killflag) && (i->_poll_thread_run)) {
+ if ((size = size_of_taskprocessor_singleton_queue(i)) > 0) {
+ /* stuff is in the queue */
+ t = pop_task_from_taskprocessor_singleton(i);
+ if (!t) {
+ ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+ continue;
+ }
+ if (!t->execute) {
+ ast_log(LOG_ERROR, "task is missing its execute callback.\n");
+ release_task(t);
+ continue;
+ }
+ if (t->execute(t) < 0) {
+ ast_log(LOG_ERROR, "execute() returned failure.\n");
+ }
+ ast_mutex_lock(&i->_taskprocessor_lock);
+ if (i->_stats) {
+ i->_stats->_tasks_processed_count++;
+ if (size > i->_stats->_max_qsize) {
+ i->_stats->_max_qsize = size;
+ }
+ task_count = i->_stats->_tasks_processed_count;
+ }
+ ast_mutex_unlock(&i->_taskprocessor_lock);
+ release_task(t);
+ t = NULL;
+
+ ast_mutex_lock(&_global_killflag_lock);
+ killflag = _global_killflag;
+ ast_mutex_unlock(&_global_killflag_lock);
+ if (--size) continue;
+ }
+ if (!killflag) {
+ tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000;
+ ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout)\n", i->_name);
+ ast_mutex_lock(&i->_taskprocessor_lock);
+ ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+ ast_mutex_unlock(&i->_taskprocessor_lock);
+ }
+ ast_mutex_lock(&_global_killflag_lock);
+ killflag = _global_killflag;
+ ast_mutex_unlock(&_global_killflag_lock);
+ }
+ while (size_of_taskprocessor_singleton_queue(i)) {
+ /* stuff is in the queue */
+ t = pop_task_from_taskprocessor_singleton(i);
+ if (t) {
+ release_task(t);
+ t = NULL;
+ }
+ }
+ i->_is_purged = TRUE;
+ return NULL;
+}
+
+static int taskprocessor_ping(struct a_task* e)
+{
if (!e) {
ast_log(LOG_ERROR, "Huh? No event!!\n");
return -1;
}
- switch (e->_id) {
- case TASKPROCESSOR_CLI_PING:
- ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
- break;
- default:
- ast_log(LOG_WARNING, "[UNHANDLED TASK: 0x%ld] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
- , (unsigned long)e->_id, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
- rc = -1;
- }
- return rc;
+ ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
+ return 0;
}
int register_sandbox_taskprocessor_clis(void)
@@ -229,7 +297,7 @@
pthread_kill(p->_poll_thread, SIGURG);
} else {
/* create it */
- if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func, p) < 0) {
+ if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
ast_mutex_unlock(&p->_taskprocessor_lock);
ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
return -1;
Modified: team/dhubbard/named_processors/res/sandbox/taskproducer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskproducer.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskproducer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskproducer.c Wed Mar 12 01:40:40 2008
@@ -28,29 +28,22 @@
static int default_queue_task(struct taskproducer* producer, struct a_task* task)
{
if ((!producer) || (!task)) {
- ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n"
- , (unsigned long)producer
- , (unsigned long)task
- );
+ ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
return -1;
}
if (push_task_to_taskprocessor_singleton(producer->_taskprocessor, task) < 0) {
- ast_log(LOG_ERROR, "we failed to push task: 0x%ld to taskprocessor \'%s\'.\n"
- , (unsigned long)task->_id
- , (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>"
- );
+ ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>");
return -1;
}
producer->_tasks_produced++;
ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
ast_cond_signal(&producer->_taskprocessor->_poll_cond);
ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
- ast_log(LOG_DEBUG, "task: 0x%ld pushed to \'%s\' and signal sent\n", task->_id, producer->_taskprocessor->_name);
return 0;
}
/* create and initialize a task producer */
-struct taskproducer* construct_taskproducer(void* owner, struct taskprocessor_singleton_info* processor)
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
{
struct taskproducer* p = NULL;
p = malloc(sizeof(struct taskproducer));
@@ -59,11 +52,8 @@
return NULL;
}
memset(p, 0, sizeof(struct taskproducer));
- p->_owner = owner;
p->_taskprocessor = processor;
p->queue_task = default_queue_task;
- ast_log(LOG_DEBUG, "created default_taskproducer for owner: 0x%ld using taskprocessor \'%s\'\n"
- , (unsigned long)owner, (processor)?processor->_name:"<null>");
return p;
}
More information about the svn-commits
mailing list