[asterisk-commits] dhubbard: branch group/taskprocessors r114818 - in /team/group/taskprocessors...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Apr 28 21:03:43 CDT 2008


Author: dhubbard
Date: Mon Apr 28 21:03:42 2008
New Revision: 114818

URL: http://svn.digium.com/view/asterisk?view=rev&rev=114818
Log:
ketchup

Modified:
    team/group/taskprocessors/include/asterisk/taskprocessor.h
    team/group/taskprocessors/main/taskprocessor.c

Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=114818&r1=114817&r2=114818
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Mon Apr 28 21:03:42 2008
@@ -47,17 +47,15 @@
 struct ast_task;
 struct ast_taskprocessor;
 
-/*! \brief ast_tps_reftype is used to specify whether a taskprocessor should be created
+/*! \brief ast_tps_options is used to specify whether a taskprocessor should be created
  * on an attempt to ast_taskprocessor_get() if the taskprocessor does not already exist.
  * The default behavior is to create a taskprocessor if it does not already exist and provide
  * a reference to the taskprocessor if it already exists. */
-enum ast_tps_reftype {
+enum ast_tps_options {
 	/*! \brief return a reference to a taskprocessor, create one if it does not exist */
-	TPS_REF_DEF = 0
+	TPS_REF_DEFAULT = 0,
 	/*! \brief return a reference to a taskprocessor ONLY if it already exists */
-	, TPS_REF_IF_EXISTS
-	/* add new entries above this comment */
-	, TPS_REF_ENUM_SIZE
+	TPS_REF_IF_EXISTS = (1 << 0),
 };
 
 /*! \brief Initialize the taskprocessor subsystem */
@@ -90,7 +88,7 @@
  * not already exist
  * return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the
  * TPS_REF_IF_EXISTS reference type is specified */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create);
 
 /*! \brief Unreference the specified taskprocessor and its reference count will decrement.
  * taskprocessors use astobj2 and will destroy themselves when their reference count reaches zero.

Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=114818&r1=114817&r2=114818
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Mon Apr 28 21:03:42 2008
@@ -97,12 +97,12 @@
 static void tps_taskprocessor_destroy(void *tps);
 static int tps_taskprocessor_ping_handler(void *datap);
 
-static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
-static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
 static struct ast_cli_entry taskprocessor_clis[] = {
-	AST_CLI_DEFINE(cli_taskprocessor_ping, "Ping a named task processors"),
-	AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
+	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processors"),
+	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
 };
 
 /*! \brief Perform necessary taskprocessor subsystem initialization stuff
@@ -164,6 +164,7 @@
 		return NULL;
 
 	tklen = strlen(a->word);
+	ast_mutex_lock(&tps_marshall);
 	i = ao2_iterator_init(tps_singletons, 0);
 	while ((p = ao2_iterator_next(&i))) {
 		if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
@@ -173,6 +174,7 @@
 		}
 		ao2_ref(p, -1);
 	}
+	ast_mutex_unlock(&tps_marshall);
 	return name;
 }
 
@@ -194,10 +196,12 @@
  * \param a CLI arguments
  * \return CLI_SUCCESS on success, CLI_SHOWUSAGE on error 
  */
-static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	struct timeval begin, end, delta;
 	char *name;
+	struct timeval tv;
+	struct timespec ts;
 	struct ast_task *t = NULL;
 	struct ast_taskprocessor *tps = NULL;
 
@@ -221,22 +225,27 @@
 		return CLI_SUCCESS;
 	}
 	ast_cli(a->fd, "\npinging %s ...", name);
-	if (!(t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_taskprocessor_ping"))) {
+	if (!(t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_tps_ping"))) {
 		ast_cli(a->fd, "\n\tfailed to allocate a task\n\n");
+		ao2_ref(tps, -1);
 		return CLI_FAILURE;
 	}
-	begin = ast_tvnow();
+	tv = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
+	ts.tv_sec = tv.tv_sec;
+	ts.tv_nsec = tv.tv_usec * 1000;
 	ast_mutex_lock(&cli_ping_cond_lock);
 	if (ast_taskprocessor_push(tps, t) < 0) {
 		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
 		ast_task_free(t);
+		ao2_ref(tps, -1);
 		return CLI_FAILURE;
 	}
-	ast_cond_wait(&cli_ping_cond, &cli_ping_cond_lock);
+	ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
 	ast_mutex_unlock(&cli_ping_cond_lock);
 	end = ast_tvnow();
 	delta = ast_tvsub(end, begin);
 	ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, delta.tv_sec, (long int)delta.tv_usec);
+	ao2_ref(tps, -1);
 	return CLI_SUCCESS;	
 }
 
@@ -246,9 +255,10 @@
  * \param a CLI arguments
  * \return CLI_SUCCESS on success, CLI_SHOWUSAGE on error
  */
-static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	char name[256];
+	int tcount;
 	unsigned long qsize;
 	unsigned long maxqsize;
 	unsigned long processed;
@@ -270,6 +280,7 @@
 		return CLI_SHOWUSAGE;
 
 	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
+	ast_mutex_lock(&tps_marshall);
 	i = ao2_iterator_init(tps_singletons, 0);
 	while ((p = ao2_iterator_next(&i))) {
 		ast_copy_string(name, p->name, sizeof(name));
@@ -279,7 +290,9 @@
 		ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
 		ao2_ref(p, -1);
 	}
-	ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", ao2_container_count(tps_singletons));
+	tcount = ao2_container_count(tps_singletons); 
+	ast_mutex_unlock(&tps_marshall);
+	ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
 	return CLI_SUCCESS;	
 }
 
@@ -399,7 +412,7 @@
  * \param custom_func the function executed by the taskprocessor thread
  * \return ast_taskprocessor pointer on success, NULL on error
  */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create)
 {
 	int rc;
 	struct ast_taskprocessor *p, tmp_tps = {
@@ -413,7 +426,7 @@
 	ast_mutex_lock(&tps_marshall);
 	p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
 	if (p) {
-		if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
+		if ((create == TPS_REF_DEFAULT) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
 			ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
 			ao2_ref(p, -1);
 			ast_mutex_unlock(&tps_marshall);
@@ -422,8 +435,8 @@
 		ast_mutex_unlock(&tps_marshall);
 		return p;
 	}
-	if (create == TPS_REF_IF_EXISTS) {
-		/* calling function does not want us to create a new taskprocessor */
+	if ((create & TPS_REF_IF_EXISTS) == TPS_REF_IF_EXISTS) {
+		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
 		ast_mutex_unlock(&tps_marshall);
 		return NULL;
 	}
@@ -440,8 +453,8 @@
 	 *
 	 * 		ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
 	 *
-	 * will result in uglier and less useful 'core show threads' output because you won't know if the default
-	 * processing function was used or not and the CLI output is going to make you sad */
+	 * will result in uglier and less useful 'core show threads' output and you won't know if the default
+	 * processing function was used or not. */
 	if (custom_func) {
 		p->poll_function = custom_func;
 		rc = ast_pthread_create(&p->poll_thread, NULL, custom_func, p);
@@ -468,7 +481,12 @@
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
 	if (tps) {
-		ao2_ref(tps, -1);
+		ast_mutex_lock(&tps_marshall);
+		ao2_unlink(tps_singletons, tps);
+		if (ao2_ref(tps, -1) > 1) {
+			ao2_link(tps_singletons, tps);
+		}
+		ast_mutex_unlock(&tps_marshall);
 	}
 	return NULL;
 }
@@ -485,12 +503,7 @@
 		ast_log(LOG_ERROR, "missing taskprocessor\n");
 		return;
 	}
-	ast_debug(5, "destroying taskprocessor \'%s\'\n", t->name);
-	/* take the taskprocessor out of the singleton container */	
-	ast_mutex_lock(&tps_marshall);
-	ao2_unlink(tps_singletons, t);
-	ast_mutex_unlock(&tps_marshall);
-	ast_debug(5, "taskprocessor \'%s\' unlinked from tps_singletons\n", t->name);
+	ast_log(LOG_DEBUG, "destroying taskprocessor \'%s\'\n", t->name);
 	/* kill it */	
 	ast_mutex_lock(&t->taskprocessor_lock);
 	t->poll_thread_run = 0;
@@ -498,6 +511,7 @@
 	ast_mutex_unlock(&t->taskprocessor_lock);
 	pthread_join(t->poll_thread, NULL);
 	t->poll_thread = AST_PTHREADT_NULL;
+	/* free it */
 	if (t->stats) {
 		ast_free(t->stats);
 		t->stats = NULL;




More information about the asterisk-commits mailing list