[asterisk-commits] dhubbard: branch group/taskprocessors r112819 - in /team/group/taskprocessors...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Apr 4 14:07:02 CDT 2008


Author: dhubbard
Date: Fri Apr  4 14:07:01 2008
New Revision: 112819

URL: http://svn.digium.com/view/asterisk?view=rev&rev=112819
Log:
taskprocessors are stored in ao2 containers now, but work is still required to properly complete this effort.  right now, refcounts will never reach 0 on the taskprocessors...coming soon

Modified:
    team/group/taskprocessors/include/asterisk/taskprocessor.h
    team/group/taskprocessors/main/pbx.c
    team/group/taskprocessors/main/taskprocessor.c

Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Fri Apr  4 14:07:01 2008
@@ -69,7 +69,7 @@
 int ast_task_free(struct ast_task *task);
 
 /*! \brief Obtain a taskprocessor reference */
-struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
 /*! \brief Release a taskprocessor reference */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
 

Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Fri Apr  4 14:07:01 2008
@@ -7966,8 +7966,9 @@
 
 	/* Initialize the PBX */
 	ast_verb(1, "Asterisk PBX Core Initializing\n");
+	taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
+
 	ast_verb(1, "Registering builtin applications:\n");
-	
 	ast_cli_register_multiple(pbx_cli, sizeof(pbx_cli) / sizeof(struct ast_cli_entry));
 	__ast_custom_function_register(&exception_function, NULL);
 
@@ -7982,8 +7983,6 @@
 	
 	/* Register manager application */
 	ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
-
-	taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
 			AST_EVENT_IE_END))) {

Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=112819&r1=112818&r2=112819
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Fri Apr  4 14:07:01 2008
@@ -24,12 +24,14 @@
  */
 
 #include "asterisk.h"
+#include "asterisk/module.h"
 #include "asterisk/time.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
 #include "signal.h"
 #include "sys/time.h"
+#include "assert.h"
 
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
@@ -61,7 +63,7 @@
  *  a unique taskprocessor thread */
 struct ast_taskprocessor {
 	/*! \brief Friendly name of the taskprocessor */
-	char name[80];
+	char *name;
 	/*! \brief Thread poll condition */
 	ast_cond_t poll_cond;
 	/*! \brief Taskprocessor thread */
@@ -77,20 +79,22 @@
 	/*! \brief Taskprocessor current queue size */
 	long queue_size;
 	/*! \brief Taskprocessor queue */
-	AST_LIST_HEAD(queue, ast_task) queue;
+	AST_LIST_HEAD_NOLOCK(queue, ast_task) queue;
 	/*! \brief Taskprocessor singleton list entry */
 	AST_LIST_ENTRY(ast_taskprocessor) list;
 };
+#define TPS_MAX_BUCKETS 7
+static struct ao2_container *tps_singletons;
+ast_mutex_t tps_marshall;
 
 AST_LIST_HEAD_STATIC(taskprocessor_singletons, ast_taskprocessor);
-static int taskprocessor_singletons_list_size = 0;
 static ast_cond_t cli_ping_cond;
 static ast_mutex_t cli_ping_cond_lock;
 
+static int tps_hash_cb(const void *obj, const int flags);
+static int tps_cmp_cb(void *obj, void *arg, int flags);
 static void *tps_default_processor_function(void *data);
 static struct ast_taskprocessor *tps_default_constructor(void);
-static int tps_taskprocessor_add(struct ast_taskprocessor *t);
-static int tps_taskprocessor_count(void);
 static void tps_taskprocessor_destroy(void *tps);
 static int tps_taskprocessor_ping_handler(void *datap);
 
@@ -108,6 +112,10 @@
  */
 int ast_tps_init(void)
 {
+	tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb);
+	if (!tps_singletons)
+		return AST_MODULE_LOAD_FAILURE;
+
 	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
 	return 0;
 }
@@ -151,19 +159,21 @@
 	int tklen;
 	int wordnum = 0;
 	char *name = NULL;
+	struct ao2_iterator i;
 
 	if (a->pos != 2)
 		return NULL;
 
 	tklen = strlen(a->word);
-	AST_LIST_LOCK(&taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
+	i = ao2_iterator_init(tps_singletons, 0);
+	while ((p = ao2_iterator_next(&i))) {
 		if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
 			name = ast_strdup(p->name);
+			ao2_ref(p, -1);
 			break;
 		}
-	}
-	AST_LIST_UNLOCK(&taskprocessor_singletons);
+		ao2_ref(p, -1);
+	}
 	return name;
 }
 
@@ -244,6 +254,7 @@
 	unsigned long maxqsize;
 	unsigned long processed;
 	struct ast_taskprocessor *p;
+	struct ao2_iterator i;
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -260,18 +271,16 @@
 		return CLI_SHOWUSAGE;
 
 	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
-	AST_LIST_LOCK(&taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
-		ast_mutex_lock(&p->taskprocessor_lock);
+	i = ao2_iterator_init(tps_singletons, 0);
+	while ((p = ao2_iterator_next(&i))) {
 		ast_copy_string(name, p->name, sizeof(name));
 		qsize = p->queue_size;
 		maxqsize = p->stats->max_qsize;
 		processed = p->stats->_tasks_processed_count;
-		ast_mutex_unlock(&p->taskprocessor_lock);
 		ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
-	}
-	AST_LIST_UNLOCK(&taskprocessor_singletons);
-	ast_cli(a->fd, "\n\n");
+		ao2_ref(p, -1);
+	}
+	ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", ao2_container_count(tps_singletons));
 	return CLI_SUCCESS;	
 }
 
@@ -336,7 +345,6 @@
 	}
 	return NULL;
 }
-
 /*!
  * \note Taskprocessors are uniquely identified by name 
  */
@@ -356,7 +364,6 @@
 
 	return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH : 0;
 }
-
 /*! \brief The default taskprocessor constructor creates an initialized taskprocessor structure
  * \param poll_freq The polling frequency of the taskprocessor
  * \return ast_taskprocessor structure on success, NULL on error
@@ -393,43 +400,47 @@
  * \param custom_func the function executed by the taskprocessor thread
  * \return ast_taskprocessor pointer on success, NULL on error
  */
-struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
 {
 	int rc;
-	struct ast_taskprocessor *p;
+	struct ast_taskprocessor *p, tmp_tps = {
+		.name = name,
+	};
 		
 	if (!name) {
 		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
 		return NULL;
 	}
-	AST_LIST_LOCK(&taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
-		if (!strcasecmp(p->name, name)) {
-			if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
-				ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
-				AST_LIST_UNLOCK(&taskprocessor_singletons);
-				return NULL;
-			}
-			ao2_ref(p, 1);
-			AST_LIST_UNLOCK(&taskprocessor_singletons);
-			return p;
+	ast_mutex_lock(&tps_marshall);
+	p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
+	if (p) {
+		if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
+			ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
+			ao2_ref(p, -1);
+			ast_mutex_unlock(&tps_marshall);
+			return NULL;
 		}
+		ast_mutex_unlock(&tps_marshall);
+		return p;
 	}
 	if (create == TPS_REF_IF_EXISTS) {
-		/* Reference a taskprocessor only if it already exists.  It does not. */
-		AST_LIST_UNLOCK(&taskprocessor_singletons);
+		/* calling function does not want us to create a new taskprocessor */
+		ast_mutex_unlock(&tps_marshall);
 		return NULL;
 	}
 	/* create a new taskprocessor */
 	if ((p = tps_default_constructor()) == NULL) {
-		ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
-		AST_LIST_UNLOCK(&taskprocessor_singletons);
-		return NULL;
-	}
-	ast_copy_string(p->name, name, sizeof(p->name));
+		ast_log(LOG_ERROR, "The default taskprocessor object could not be created for reference \'%s\'\n", name);
+		ast_mutex_unlock(&tps_marshall);
+		return NULL;
+	}
+	p->name = ast_strdup(name);
 	p->poll_thread_run = 1;
-	/* reducing the ast_pthread_create() blocks below into a single block, with:
-	 * ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
+
+	/* reducing the ast_pthread_create() blocks below into a single block, to something like:
+	 *
+	 * 		ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
+	 *
 	 * will result in uglier 'core show threads' output and you won't know if the default
 	 * processing function was used or not. */
 	if (custom_func) {
@@ -440,16 +451,18 @@
 		rc = ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p);
 	}
 	if (rc < 0) {
-		ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->name);
-		AST_LIST_UNLOCK(&taskprocessor_singletons);
-		return ast_taskprocessor_unreference(p);
-	}
-	if (tps_taskprocessor_add(p) < 0) {
-		ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\'\n", p->name);
-		AST_LIST_UNLOCK(&taskprocessor_singletons);
-		return ast_taskprocessor_unreference(p);
-	}
-	AST_LIST_UNLOCK(&taskprocessor_singletons);
+		ast_log(LOG_ERROR, "Taskprocessor \'%s\' failed to create the processing thread.\n", p->name);
+		ao2_ref(p, -1);
+		ast_mutex_unlock(&tps_marshall);
+		return NULL;
+	}
+	if (!(ao2_link(tps_singletons, p))) {
+		ast_log(LOG_ERROR, "Failed to add taskprocessor \'%s\' to container\n", p->name);
+		ao2_ref(p, -1);
+		ast_mutex_unlock(&tps_marshall);
+		return NULL;
+	}
+	ast_mutex_unlock(&tps_marshall);
 	return p;
 }
 
@@ -468,12 +481,12 @@
 static void tps_taskprocessor_destroy(void *tps)
 {
 	struct ast_taskprocessor *t = tps;
-
+	
+	assert(tps);
 	ast_debug(5, "destroying taskprocessor \'%s\'\n", t->name);
-	AST_LIST_LOCK(&taskprocessor_singletons);
-	AST_LIST_REMOVE(&taskprocessor_singletons, t, list);
-	taskprocessor_singletons_list_size--;
-	ast_debug(5, "taskprocessor_singleton \'%s\' removed.\n", t->name);
+	ao2_unlink(tps_singletons, t);
+
+	ast_debug(5, "taskprocessor \'%s\' unlinked from tps_singletons\n", t->name);
 	ast_mutex_lock(&t->taskprocessor_lock);
 	t->poll_thread_run = 0;
 	ast_cond_signal(&t->poll_cond);
@@ -484,28 +497,8 @@
 		ast_free(t->stats);
 		t->stats = NULL;
 	}
-	AST_LIST_UNLOCK(&taskprocessor_singletons);
+	ast_free(t->name);
 	return;
-}
-
-/*! \brief Add a new ast_taskprocessor structure to the taskprocessor container
- * \param t ast_taskprocessor structure to add to the taskprocessor container
- * \return 0 on success, -1 on error
- */
-static int tps_taskprocessor_add(struct ast_taskprocessor *t)
-{
-	AST_LIST_INSERT_TAIL(&taskprocessor_singletons, t, list);
-	taskprocessor_singletons_list_size++;
-	return 0;
-}
-
-/*! \brief Return the number of taskprocessors in the taskprocessor container
- * \param void
- * \return The number of taskprocessors in the taskprocessor container
- */
-static int tps_taskprocessor_count(void)
-{
-	return taskprocessor_singletons_list_size;
 }
 
 /*! \brief Push a task into the taskprocessor queue
@@ -515,26 +508,11 @@
  */
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t)
 {
-	int lock_failures = 0;
-
-	if (!tps || !t) {
-		ast_log(LOG_ERROR, "A \'%s\' is required for this function.\n", (tps)?"task":"taskprocessor");
-		return -1;
-	}
-	AST_LIST_LOCK(&tps->queue);
-	while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
-		lock_failures++;
-		AST_LIST_UNLOCK(&tps->queue);
-		usleep(1);
-		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
-			return -1;
-		}
-		AST_LIST_LOCK(&tps->queue);
-	}
+	assert(t);
+	assert(tps);
+	ast_mutex_lock(&tps->taskprocessor_lock);
 	AST_LIST_INSERT_TAIL(&tps->queue, t, list);
 	tps->queue_size++;
-	AST_LIST_UNLOCK(&tps->queue);
 	ast_cond_signal(&tps->poll_cond);
 	ast_mutex_unlock(&tps->taskprocessor_lock);
 	return 0;
@@ -546,26 +524,15 @@
  */
 struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
-	struct ast_task *t = NULL;
-	int lock_failures = 0;
-
-	AST_LIST_LOCK(&tps->queue);
-	while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
-		lock_failures++;
-		AST_LIST_UNLOCK(&tps->queue);
-		usleep(1);
-		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
-			return t;
-		}
-		AST_LIST_LOCK(&tps->queue);
-	}
-	if (!AST_LIST_EMPTY(&tps->queue)) {
-		t = AST_LIST_REMOVE_HEAD(&tps->queue, list);
+	struct ast_task *t;
+
+	assert(tps);
+	assert(tps->name);
+	ast_mutex_lock(&tps->taskprocessor_lock);
+	if ((t = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
 		tps->queue_size--;
 	}
 	ast_mutex_unlock(&tps->taskprocessor_lock);
-	AST_LIST_UNLOCK(&tps->queue);
 	return t;
 }
 
@@ -575,23 +542,11 @@
  */
 int ast_taskprocessor_depth(struct ast_taskprocessor *tps)
 {
-	int size = -1;
-	int lock_failures = 0;
-
-	AST_LIST_LOCK(&tps->queue);
-	while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
-		lock_failures++;
-		AST_LIST_UNLOCK(&tps->queue);
-		usleep(1);
-		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
-			return size;
-		}
-		AST_LIST_LOCK(&tps->queue);
-	}
+	int size;
+
+	assert(tps);
+	assert(tps->name);
 	size = tps->queue_size;
-	ast_mutex_unlock(&tps->taskprocessor_lock);
-	AST_LIST_UNLOCK(&tps->queue);
 	return size;
 }
 




More information about the asterisk-commits mailing list