[svn-commits] dhubbard: branch group/taskprocessors r112819 - in /team/group/taskprocessors...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Fri Apr 4 14:07:02 CDT 2008
Author: dhubbard
Date: Fri Apr 4 14:07:01 2008
New Revision: 112819
URL: http://svn.digium.com/view/asterisk?view=rev&rev=112819
Log:
taskprocessors are stored in ao2 containers now, but work is still required to properly complete this effort. right now, refcounts will never reach 0 on the taskprocessors...coming soon
Modified:
team/group/taskprocessors/include/asterisk/taskprocessor.h
team/group/taskprocessors/main/pbx.c
team/group/taskprocessors/main/taskprocessor.c
Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Fri Apr 4 14:07:01 2008
@@ -69,7 +69,7 @@
int ast_task_free(struct ast_task *task);
/*! \brief Obtain a taskprocessor reference */
-struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
/*! \brief Release a taskprocessor reference */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Fri Apr 4 14:07:01 2008
@@ -7966,8 +7966,9 @@
/* Initialize the PBX */
ast_verb(1, "Asterisk PBX Core Initializing\n");
+ taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
+
ast_verb(1, "Registering builtin applications:\n");
-
ast_cli_register_multiple(pbx_cli, sizeof(pbx_cli) / sizeof(struct ast_cli_entry));
__ast_custom_function_register(&exception_function, NULL);
@@ -7982,8 +7983,6 @@
/* Register manager application */
ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
-
- taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
AST_EVENT_IE_END))) {
Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Fri Apr 4 14:07:01 2008
@@ -24,12 +24,14 @@
*/
#include "asterisk.h"
+#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "signal.h"
#include "sys/time.h"
+#include "assert.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
@@ -61,7 +63,7 @@
* a unique taskprocessor thread */
struct ast_taskprocessor {
/*! \brief Friendly name of the taskprocessor */
- char name[80];
+ char *name;
/*! \brief Thread poll condition */
ast_cond_t poll_cond;
/*! \brief Taskprocessor thread */
@@ -77,20 +79,22 @@
/*! \brief Taskprocessor current queue size */
long queue_size;
/*! \brief Taskprocessor queue */
- AST_LIST_HEAD(queue, ast_task) queue;
+ AST_LIST_HEAD_NOLOCK(queue, ast_task) queue;
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
};
+#define TPS_MAX_BUCKETS 7
+static struct ao2_container *tps_singletons;
+ast_mutex_t tps_marshall;
AST_LIST_HEAD_STATIC(taskprocessor_singletons, ast_taskprocessor);
-static int taskprocessor_singletons_list_size = 0;
static ast_cond_t cli_ping_cond;
static ast_mutex_t cli_ping_cond_lock;
+static int tps_hash_cb(const void *obj, const int flags);
+static int tps_cmp_cb(void *obj, void *arg, int flags);
static void *tps_default_processor_function(void *data);
static struct ast_taskprocessor *tps_default_constructor(void);
-static int tps_taskprocessor_add(struct ast_taskprocessor *t);
-static int tps_taskprocessor_count(void);
static void tps_taskprocessor_destroy(void *tps);
static int tps_taskprocessor_ping_handler(void *datap);
@@ -108,6 +112,10 @@
*/
int ast_tps_init(void)
{
+ tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb);
+ if (!tps_singletons)
+ return AST_MODULE_LOAD_FAILURE;
+
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
return 0;
}
@@ -151,19 +159,21 @@
int tklen;
int wordnum = 0;
char *name = NULL;
+ struct ao2_iterator i;
if (a->pos != 2)
return NULL;
tklen = strlen(a->word);
- AST_LIST_LOCK(&taskprocessor_singletons);
- AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
+ i = ao2_iterator_init(tps_singletons, 0);
+ while ((p = ao2_iterator_next(&i))) {
if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
name = ast_strdup(p->name);
+ ao2_ref(p, -1);
break;
}
- }
- AST_LIST_UNLOCK(&taskprocessor_singletons);
+ ao2_ref(p, -1);
+ }
return name;
}
@@ -244,6 +254,7 @@
unsigned long maxqsize;
unsigned long processed;
struct ast_taskprocessor *p;
+ struct ao2_iterator i;
switch (cmd) {
case CLI_INIT:
@@ -260,18 +271,16 @@
return CLI_SHOWUSAGE;
ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
- AST_LIST_LOCK(&taskprocessor_singletons);
- AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
- ast_mutex_lock(&p->taskprocessor_lock);
+ i = ao2_iterator_init(tps_singletons, 0);
+ while ((p = ao2_iterator_next(&i))) {
ast_copy_string(name, p->name, sizeof(name));
qsize = p->queue_size;
maxqsize = p->stats->max_qsize;
processed = p->stats->_tasks_processed_count;
- ast_mutex_unlock(&p->taskprocessor_lock);
ast_cli(a->fd, "\n%24s %17ld %12ld %12ld", name, processed, qsize, maxqsize);
- }
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- ast_cli(a->fd, "\n\n");
+ ao2_ref(p, -1);
+ }
+ ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", ao2_container_count(tps_singletons));
return CLI_SUCCESS;
}
@@ -336,7 +345,6 @@
}
return NULL;
}
-
/*!
* \note Taskprocessors are uniquely identified by name
*/
@@ -356,7 +364,6 @@
return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH : 0;
}
-
/*! \brief The default taskprocessor constructor creates an initialized taskprocessor structure
* \param poll_freq The polling frequency of the taskprocessor
* \return ast_taskprocessor structure on success, NULL on error
@@ -393,43 +400,47 @@
* \param custom_func the function executed by the taskprocessor thread
* \return ast_taskprocessor pointer on success, NULL on error
*/
-struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
{
int rc;
- struct ast_taskprocessor *p;
+ struct ast_taskprocessor *p, tmp_tps = {
+ .name = name,
+ };
if (!name) {
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
return NULL;
}
- AST_LIST_LOCK(&taskprocessor_singletons);
- AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
- if (!strcasecmp(p->name, name)) {
- if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
- ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- return NULL;
- }
- ao2_ref(p, 1);
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- return p;
+ ast_mutex_lock(&tps_marshall);
+ p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
+ if (p) {
+ if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
+ ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
+ ao2_ref(p, -1);
+ ast_mutex_unlock(&tps_marshall);
+ return NULL;
}
+ ast_mutex_unlock(&tps_marshall);
+ return p;
}
if (create == TPS_REF_IF_EXISTS) {
- /* Reference a taskprocessor only if it already exists. It does not. */
- AST_LIST_UNLOCK(&taskprocessor_singletons);
+ /* calling function does not want us to create a new taskprocessor */
+ ast_mutex_unlock(&tps_marshall);
return NULL;
}
/* create a new taskprocessor */
if ((p = tps_default_constructor()) == NULL) {
- ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- return NULL;
- }
- ast_copy_string(p->name, name, sizeof(p->name));
+ ast_log(LOG_ERROR, "The default taskprocessor object could not be created for reference \'%s\'\n", name);
+ ast_mutex_unlock(&tps_marshall);
+ return NULL;
+ }
+ p->name = ast_strdup(name);
p->poll_thread_run = 1;
- /* reducing the ast_pthread_create() blocks below into a single block, with:
- * ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
+
+ /* reducing the ast_pthread_create() blocks below into a single block, to something like:
+ *
+ * ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
+ *
* will result in uglier 'core show threads' output and you won't know if the default
* processing function was used or not. */
if (custom_func) {
@@ -440,16 +451,18 @@
rc = ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p);
}
if (rc < 0) {
- ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->name);
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- return ast_taskprocessor_unreference(p);
- }
- if (tps_taskprocessor_add(p) < 0) {
- ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\'\n", p->name);
- AST_LIST_UNLOCK(&taskprocessor_singletons);
- return ast_taskprocessor_unreference(p);
- }
- AST_LIST_UNLOCK(&taskprocessor_singletons);
+ ast_log(LOG_ERROR, "Taskprocessor \'%s\' failed to create the processing thread.\n", p->name);
+ ao2_ref(p, -1);
+ ast_mutex_unlock(&tps_marshall);
+ return NULL;
+ }
+ if (!(ao2_link(tps_singletons, p))) {
+ ast_log(LOG_ERROR, "Failed to add taskprocessor \'%s\' to container\n", p->name);
+ ao2_ref(p, -1);
+ ast_mutex_unlock(&tps_marshall);
+ return NULL;
+ }
+ ast_mutex_unlock(&tps_marshall);
return p;
}
@@ -468,12 +481,12 @@
static void tps_taskprocessor_destroy(void *tps)
{
struct ast_taskprocessor *t = tps;
-
+
+ assert(tps);
ast_debug(5, "destroying taskprocessor \'%s\'\n", t->name);
- AST_LIST_LOCK(&taskprocessor_singletons);
- AST_LIST_REMOVE(&taskprocessor_singletons, t, list);
- taskprocessor_singletons_list_size--;
- ast_debug(5, "taskprocessor_singleton \'%s\' removed.\n", t->name);
+ ao2_unlink(tps_singletons, t);
+
+ ast_debug(5, "taskprocessor \'%s\' unlinked from tps_singletons\n", t->name);
ast_mutex_lock(&t->taskprocessor_lock);
t->poll_thread_run = 0;
ast_cond_signal(&t->poll_cond);
@@ -484,28 +497,8 @@
ast_free(t->stats);
t->stats = NULL;
}
- AST_LIST_UNLOCK(&taskprocessor_singletons);
+ ast_free(t->name);
return;
-}
-
-/*! \brief Add a new ast_taskprocessor structure to the taskprocessor container
- * \param t ast_taskprocessor structure to add to the taskprocessor container
- * \return 0 on success, -1 on error
- */
-static int tps_taskprocessor_add(struct ast_taskprocessor *t)
-{
- AST_LIST_INSERT_TAIL(&taskprocessor_singletons, t, list);
- taskprocessor_singletons_list_size++;
- return 0;
-}
-
-/*! \brief Return the number of taskprocessors in the taskprocessor container
- * \param void
- * \return The number of taskprocessors in the taskprocessor container
- */
-static int tps_taskprocessor_count(void)
-{
- return taskprocessor_singletons_list_size;
}
/*! \brief Push a task into the taskprocessor queue
@@ -515,26 +508,11 @@
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t)
{
- int lock_failures = 0;
-
- if (!tps || !t) {
- ast_log(LOG_ERROR, "A \'%s\' is required for this function.\n", (tps)?"task":"taskprocessor");
- return -1;
- }
- AST_LIST_LOCK(&tps->queue);
- while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
- lock_failures++;
- AST_LIST_UNLOCK(&tps->queue);
- usleep(1);
- if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
- return -1;
- }
- AST_LIST_LOCK(&tps->queue);
- }
+ assert(t);
+ assert(tps);
+ ast_mutex_lock(&tps->taskprocessor_lock);
AST_LIST_INSERT_TAIL(&tps->queue, t, list);
tps->queue_size++;
- AST_LIST_UNLOCK(&tps->queue);
ast_cond_signal(&tps->poll_cond);
ast_mutex_unlock(&tps->taskprocessor_lock);
return 0;
@@ -546,26 +524,15 @@
*/
struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps)
{
- struct ast_task *t = NULL;
- int lock_failures = 0;
-
- AST_LIST_LOCK(&tps->queue);
- while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
- lock_failures++;
- AST_LIST_UNLOCK(&tps->queue);
- usleep(1);
- if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
- return t;
- }
- AST_LIST_LOCK(&tps->queue);
- }
- if (!AST_LIST_EMPTY(&tps->queue)) {
- t = AST_LIST_REMOVE_HEAD(&tps->queue, list);
+ struct ast_task *t;
+
+ assert(tps);
+ assert(tps->name);
+ ast_mutex_lock(&tps->taskprocessor_lock);
+ if ((t = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
tps->queue_size--;
}
ast_mutex_unlock(&tps->taskprocessor_lock);
- AST_LIST_UNLOCK(&tps->queue);
return t;
}
@@ -575,23 +542,11 @@
*/
int ast_taskprocessor_depth(struct ast_taskprocessor *tps)
{
- int size = -1;
- int lock_failures = 0;
-
- AST_LIST_LOCK(&tps->queue);
- while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
- lock_failures++;
- AST_LIST_UNLOCK(&tps->queue);
- usleep(1);
- if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
- return size;
- }
- AST_LIST_LOCK(&tps->queue);
- }
+ int size;
+
+ assert(tps);
+ assert(tps->name);
size = tps->queue_size;
- ast_mutex_unlock(&tps->taskprocessor_lock);
- AST_LIST_UNLOCK(&tps->queue);
return size;
}
More information about the svn-commits
mailing list