[asterisk-commits] russell: branch russell/sched_thread r143120 - in /team/russell/sched_thread:...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sun Sep 14 19:07:38 CDT 2008


Author: russell
Date: Sun Sep 14 19:07:38 2008
New Revision: 143120

URL: http://svn.digium.com/view/asterisk?view=rev&rev=143120
Log:
Merge some changes to the taskprocessor API ...
 - s/taskprocessor/taskproc/ because I'm lazy
 - s/unreference/unref/ again, because I'm lazy
 - Add some new stuff - ast_taskproc_pool
   - This allows you to create a pool of task processors.
   - When you queue tasks up to a pool of task processors, they are currently
     distributed via the processors round robin.  However, I plan to add a way
	 that lets you customize distribution logic.

Modified:
    team/russell/sched_thread/apps/app_queue.c
    team/russell/sched_thread/apps/app_voicemail.c
    team/russell/sched_thread/include/asterisk/taskprocessor.h
    team/russell/sched_thread/main/event.c
    team/russell/sched_thread/main/pbx.c
    team/russell/sched_thread/main/taskprocessor.c

Modified: team/russell/sched_thread/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/apps/app_queue.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/apps/app_queue.c (original)
+++ team/russell/sched_thread/apps/app_queue.c Sun Sep 14 19:07:38 2008
@@ -132,7 +132,7 @@
 	{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
 };
 
-static struct ast_taskprocessor *devicestate_tps;
+static struct ast_taskproc *devicestate_tps;
 
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
@@ -778,7 +778,7 @@
 	}
 	sc->state = state;
 	strcpy(sc->dev, device);
-	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
+	if (ast_taskproc_push(devicestate_tps, handle_statechange, sc) < 0) {
 		ast_free(sc);
 	}
 }
@@ -6367,7 +6367,7 @@
 		queue_unref(q);
 	}
 	ao2_ref(queues, -1);
-	devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
+	devicestate_tps = ast_taskproc_unref(devicestate_tps);
 	ast_unload_realtime("queue_members");
 	return res;
 }
@@ -6414,7 +6414,7 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
+	if (!(devicestate_tps = ast_taskproc_get("app_queue", 0))) {
 		ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
 	}
 

Modified: team/russell/sched_thread/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/apps/app_voicemail.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/apps/app_voicemail.c (original)
+++ team/russell/sched_thread/apps/app_voicemail.c Sun Sep 14 19:07:38 2008
@@ -639,7 +639,7 @@
 	uint32_t uniqueid;
 };
 
-static struct ast_taskprocessor *mwi_subscription_tps;
+static struct ast_taskproc *mwi_subscription_tps;
 
 static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
 
@@ -9746,7 +9746,7 @@
 
 	u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	*uniqueid = u;
-	if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
+	if (ast_taskproc_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
 		ast_free(uniqueid);
 	}
 }
@@ -9769,7 +9769,7 @@
 	mwist->context = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT));
 	mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	
-	if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+	if (ast_taskproc_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
 		ast_free(mwist);
 	}
 }
@@ -10651,7 +10651,7 @@
 	if (poll_thread != AST_PTHREADT_NULL)
 		stop_poll_thread();
 
-	mwi_subscription_tps = ast_taskprocessor_unreference(mwi_subscription_tps);
+	mwi_subscription_tps = ast_taskproc_unref(mwi_subscription_tps);
 	ast_unload_realtime("voicemail");
 	ast_unload_realtime("voicemail_data");
 
@@ -10669,7 +10669,7 @@
 	/* compute the location of the voicemail spool directory */
 	snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
 	
-	if (!(mwi_subscription_tps = ast_taskprocessor_get("app_voicemail", 0))) {
+	if (!(mwi_subscription_tps = ast_taskproc_get("app_voicemail", 0))) {
 		ast_log(AST_LOG_WARNING, "failed to reference mwi subscription taskprocessor.  MWI will not work\n");
 	}
 

Modified: team/russell/sched_thread/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/include/asterisk/taskprocessor.h?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/include/asterisk/taskprocessor.h (original)
+++ team/russell/sched_thread/include/asterisk/taskprocessor.h Sun Sep 14 19:07:38 2008
@@ -1,9 +1,10 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2007-2008, Dwayne M. Hubbard 
+ * Copyright (C) 2007 - 2008, Digium, Inc.
  *
  * Dwayne M. Hubbard <dhubbard at digium.com>
+ * Russell Bryant <russell at digium.com>
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk project. Please do not directly contact
@@ -15,24 +16,25 @@
  * the GNU General Public License Version 2. See the LICENSE file
  * at the top of the source tree.
  */
-#include "asterisk.h"
+
+#ifndef __AST_TASKPROC_H__
+#define __AST_TASKPROC_H__
+
 #include "asterisk/lock.h"
 #include "asterisk/linkedlists.h"
 #include "asterisk/utils.h"
 #include "asterisk/options.h"
 
-#ifndef __taskprocessor_h__
-#define __taskprocessor_h__
-
 /*!
- * \file taskprocessor.h
+ * \file
  * \brief An API for managing task processing threads that can be shared across modules
  *
  * \author Dwayne M. Hubbard <dhubbard at digium.com>
+ * \author Russell Bryant <russell at digium.com>
  *
  * \note A taskprocessor is a named singleton containing a processing thread and
  * a task queue that serializes tasks pushed into it by [a] module(s) that reference the taskprocessor.  
- * A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get()
+ * A taskprocessor is created the first time its name is requested via the ast_taskproc_get()
  * function and destroyed when the taskprocessor reference count reaches zero.
  *
  * Modules that obtain a reference to a taskprocessor can queue tasks into the taskprocessor
@@ -40,21 +42,21 @@
  * of the queue.  A task is a wrapper around a task-handling function pointer and a data
  * pointer.  It is the responsibility of the task handling function to free memory allocated for
  * the task data pointer.  A task is pushed into a taskprocessor queue using the 
- * ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the
+ * ast_taskproc_push(taskprocessor, taskhandler, taskdata) function and freed by the
  * taskprocessor after the task handling function returns.  A module releases its reference to a
- * taskprocessor using the ast_taskprocessor_unreference() function which may result in the
+ * taskprocessor using the ast_taskproc_unref() function which may result in the
  * destruction of the taskprocessor if the taskprocessor's reference count reaches zero.  Tasks waiting
  * to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero
  * will be purged and released from the taskprocessor queue without being processed.
  */
-struct ast_taskprocessor;
+struct ast_taskproc;
 
 /*! \brief ast_tps_options for specification of taskprocessor options
  *
- * Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor 
+ * Specify whether a taskprocessor should be created via ast_taskproc_get() if the taskprocessor 
  * does not already exist.  The default behavior is to create a taskprocessor if it does not already exist 
  * and provide its reference to the calling function.  To only return a reference to a taskprocessor if 
- * and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskprocessor_get(). */
+ * and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskproc_get(). */
 enum ast_tps_options {
 	/*! \brief return a reference to a taskprocessor, create one if it does not exist */
 	TPS_REF_DEFAULT = 0,
@@ -65,13 +67,13 @@
 /*! \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
  *
  * The default behavior of instantiating a taskprocessor if one does not already exist can be
- * disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().
+ * disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskproc_get().
  * \param name The name of the taskprocessor
  * \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does 
  * not already exist
  * return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the
  * TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create);
+struct ast_taskproc *ast_taskproc_get(char *name, enum ast_tps_options create);
 
 /*! \brief Unreference the specified taskprocessor and its reference count will decrement.
  *
@@ -79,16 +81,38 @@
  * themself when the taskprocessor reference count reaches zero.
  * \param tps taskprocessor to unreference
  * \return NULL */
-void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
+void *ast_taskproc_unref(struct ast_taskproc *tps);
+
+/*! \brief a task handler */
+typedef int (*ast_taskproc_cb)(void *datap);
 
 /*! \brief Push a task into the specified taskprocessor queue and signal the taskprocessor thread
  * \param tps The taskprocessor structure
  * \param task_exe The task handling function to push into the taskprocessor queue
  * \param datap The data to be used by the task handling function
  * \return zero on success, -1 on failure */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+int ast_taskproc_push(struct ast_taskproc *tps, ast_taskproc_cb cb, void *datap);
 
-/*! \brief Return the name of the taskprocessor singleton */
-const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
-#endif
+/*! 
+ * \brief Return the name of the taskprocessor singleton 
+ *
+ * \note The return value is only valid as long as the calling code still holds
+ *       a valid reference to the task processor.
+ */
+const char *ast_taskproc_name(struct ast_taskproc *tps);
 
+/*! \brief An opaque type for a pool of task processors */
+struct ast_taskproc_pool;
+
+struct ast_taskproc_pool *ast_taskproc_pool_create(const char *name_prefix);
+
+unsigned int ast_taskproc_pool_set_size(struct ast_taskproc_pool *p, 
+		unsigned int size);
+
+struct ast_taskproc_pool *ast_taskproc_pool_destroy(struct ast_taskproc_pool *p);
+
+int ast_taskproc_pool_push(struct ast_taskproc_pool *p, ast_taskproc_cb cb,
+		void *data);
+
+#endif /* __AST_TASKPROC_H__ */
+

Modified: team/russell/sched_thread/main/event.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/event.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/event.c (original)
+++ team/russell/sched_thread/main/event.c Sun Sep 14 19:07:38 2008
@@ -37,7 +37,7 @@
 #include "asterisk/utils.h"
 #include "asterisk/taskprocessor.h"
 
-struct ast_taskprocessor *event_dispatcher;
+struct ast_taskproc *event_dispatcher;
 
 /*!
  * \brief An event information element
@@ -1070,7 +1070,7 @@
 
 	event_ref->event = event;
 
-	return ast_taskprocessor_push(event_dispatcher, handle_event, event_ref);
+	return ast_taskproc_push(event_dispatcher, handle_event, event_ref);
 }
 
 void ast_event_init(void)
@@ -1083,5 +1083,5 @@
 	for (i = 0; i < AST_EVENT_TOTAL; i++)
 		AST_RWLIST_HEAD_INIT(&ast_event_cache[i]);
 
-	event_dispatcher = ast_taskprocessor_get("core_event_dispatcher", 0);
-}
+	event_dispatcher = ast_taskproc_get("core_event_dispatcher", 0);
+}

Modified: team/russell/sched_thread/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/pbx.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/pbx.c (original)
+++ team/russell/sched_thread/main/pbx.c Sun Sep 14 19:07:38 2008
@@ -124,7 +124,7 @@
 struct ast_context;
 struct ast_app;
 
-static struct ast_taskprocessor *device_state_tps;
+static struct ast_taskproc *device_state_tps;
 
 AST_THREADSTORAGE(switch_data);
 
@@ -8410,7 +8410,7 @@
 	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
 		return;
 	strcpy(sc->dev, device);
-	if (ast_taskprocessor_push(device_state_tps, handle_statechange, sc) < 0) {
+	if (ast_taskproc_push(device_state_tps, handle_statechange, sc) < 0) {
 		ast_free(sc);
 	}
 }
@@ -8421,7 +8421,7 @@
 
 	/* Initialize the PBX */
 	ast_verb(1, "Asterisk PBX Core Initializing\n");
-	if (!(device_state_tps = ast_taskprocessor_get("pbx-core", 0))) {
+	if (!(device_state_tps = ast_taskproc_get("pbx-core", 0))) {
 		ast_log(LOG_WARNING, "failed to create pbx-core taskprocessor\n");
 	}
 

Modified: team/russell/sched_thread/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/taskprocessor.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/taskprocessor.c (original)
+++ team/russell/sched_thread/main/taskprocessor.c Sun Sep 14 19:07:38 2008
@@ -1,9 +1,10 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2007-2008, Dwayne M. Hubbard 
+ * Copyright (C) 2007 - 2008, Digium, Inc.
  *
  * Dwayne M. Hubbard <dhubbard at digium.com>
+ * Russell Bryant <russell at digium.com>
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk project. Please do not directly contact
@@ -15,11 +16,13 @@
  * the GNU General Public License Version 2. See the LICENSE file
  * at the top of the source tree.
  */
+
 /*! \file
  *
  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
  *
  * \author Dwayne Hubbard <dhubbard at digium.com>
+ * \author Russell Bryant <russell at digium.com>
  */
 
 #include "asterisk.h"
@@ -51,16 +54,16 @@
 	AST_LIST_ENTRY(tps_task) list;
 };
 
-/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
-struct tps_taskprocessor_stats {
+/*! \brief tps_taskproc_stats maintain statistics for a taskprocessor. */
+struct tps_taskproc_stats {
 	/*! \brief This is the maximum number of tasks queued at any one time */
 	unsigned long max_qsize;
 	/*! \brief This is the current number of tasks processed */
 	unsigned long _tasks_processed_count;
 };
 
-/*! \brief A ast_taskprocessor structure is a singleton by name */
-struct ast_taskprocessor {
+/*! \brief A ast_taskproc structure is a singleton by name */
+struct ast_taskproc {
 	/*! \brief Friendly name of the taskprocessor */
 	char *name;
 	/*! \brief Thread poll condition */
@@ -72,18 +75,24 @@
 	/*! \brief Taskprocesor thread run flag */
 	unsigned char poll_thread_run;
 	/*! \brief Taskprocessor statistics */
-	struct tps_taskprocessor_stats *stats;
+	struct tps_taskproc_stats *stats;
 	/*! \brief Taskprocessor current queue size */
 	long tps_queue_size;
 	/*! \brief Taskprocessor queue */
 	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
 	/*! \brief Taskprocessor singleton list entry */
-	AST_LIST_ENTRY(ast_taskprocessor) list;
+	AST_LIST_ENTRY(ast_taskproc) list;
 };
 #define TPS_MAX_BUCKETS 7
 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
 static struct ao2_container *tps_singletons;
 
+struct ast_taskproc_pool {
+	struct ao2_container *tps;
+	struct ao2_iterator iter;
+	char name_prefix[1];
+};
+
 /*! \brief CLI 'taskprocessor ping <blah>' operation requires a ping condition */
 static ast_cond_t cli_ping_cond;
 
@@ -99,16 +108,16 @@
 static void *tps_processing_function(void *data);
 
 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
+static void tps_taskproc_destroy(void *tps);
 
 /*! \brief CLI 'taskprocessor ping <blah>' handler function */
 static int tps_ping_handler(void *datap);
 
 /*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
+static struct tps_task *tps_taskproc_pop(struct ast_taskproc *tps);
 
 /*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
+static int tps_taskproc_depth(struct ast_taskproc *tps);
 
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -153,7 +162,7 @@
 }
 
 /* taskprocessor tab completion */
-static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a) 
+static char *tps_taskproc_tab_complete(struct ast_taskproc *p, struct ast_cli_args *a) 
 {
 	int tklen;
 	int wordnum = 0;
@@ -192,7 +201,7 @@
 	char *name;
 	struct timeval when;
 	struct timespec ts;
-	struct ast_taskprocessor *tps = NULL;
+	struct ast_taskproc *tps = NULL;
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -202,14 +211,14 @@
 			"	Displays the time required for a task to be processed\n";
 		return NULL;
 	case CLI_GENERATE:
-		return tps_taskprocessor_tab_complete(tps, a);
+		return tps_taskproc_tab_complete(tps, a);
 	}
 
 	if (a->argc != 4)
 		return CLI_SHOWUSAGE;
 
 	name = a->argv[3];
-	if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
+	if (!(tps = ast_taskproc_get(name, TPS_REF_IF_EXISTS))) {
 		ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
 		return CLI_SUCCESS;
 	}
@@ -218,7 +227,7 @@
 	ts.tv_sec = when.tv_sec;
 	ts.tv_nsec = when.tv_usec * 1000;
 	ast_mutex_lock(&cli_ping_cond_lock);
-	if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
+	if (ast_taskproc_push(tps, tps_ping_handler, 0) < 0) {
 		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
 		ao2_ref(tps, -1);
 		return CLI_FAILURE;
@@ -239,7 +248,7 @@
 	unsigned long qsize;
 	unsigned long maxqsize;
 	unsigned long processed;
-	struct ast_taskprocessor *p;
+	struct ast_taskproc *p;
 	struct ao2_iterator i;
 
 	switch (cmd) {
@@ -274,12 +283,12 @@
 /* this is the task processing worker function */
 static void *tps_processing_function(void *data)
 {
-	struct ast_taskprocessor *i = data;
+	struct ast_taskproc *i = data;
 	struct tps_task *t;
 	int size;
 
 	if (!i) {
-		ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
+		ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskproc structure.\n");
 		return NULL;
 	}
 
@@ -289,7 +298,7 @@
   			ast_mutex_unlock(&i->taskprocessor_lock);
  			break;
   		}
- 		if (!(size = tps_taskprocessor_depth(i))) {
+ 		if (!(size = tps_taskproc_depth(i))) {
  			ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
   			if (!i->poll_thread_run) {
   				ast_mutex_unlock(&i->taskprocessor_lock);
@@ -298,7 +307,7 @@
   		}
   		ast_mutex_unlock(&i->taskprocessor_lock);
  		/* stuff is in the queue */
- 		if (!(t = tps_taskprocessor_pop(i))) {
+ 		if (!(t = tps_taskproc_pop(i))) {
  			ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
  			continue;
  		}
@@ -320,7 +329,7 @@
  
  		tps_task_free(t);
   	}
-	while ((t = tps_taskprocessor_pop(i))) {
+	while ((t = tps_taskproc_pop(i))) {
 		tps_task_free(t);
 	}
 	return NULL;
@@ -329,7 +338,7 @@
 /* hash callback for astobj2 */
 static int tps_hash_cb(const void *obj, const int flags)
 {
-	const struct ast_taskprocessor *tps = obj;
+	const struct ast_taskproc *tps = obj;
 
 	return ast_str_hash(tps->name);
 }
@@ -337,15 +346,15 @@
 /* compare callback for astobj2 */
 static int tps_cmp_cb(void *obj, void *arg, int flags)
 {
-	struct ast_taskprocessor *lhs = obj, *rhs = arg;
+	struct ast_taskproc *lhs = obj, *rhs = arg;
 
 	return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
 }
 
 /* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
-{
-	struct ast_taskprocessor *t = tps;
+static void tps_taskproc_destroy(void *tps)
+{
+	struct ast_taskproc *t = tps;
 	
 	if (!tps) {
 		ast_log(LOG_ERROR, "missing taskprocessor\n");
@@ -370,7 +379,7 @@
 }
 
 /* pop the front task and return it */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
+static struct tps_task *tps_taskproc_pop(struct ast_taskproc *tps)
 {
 	struct tps_task *task;
 
@@ -386,13 +395,13 @@
 	return task;
 }
 
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+static int tps_taskproc_depth(struct ast_taskproc *tps)
 {
 	return (tps) ? tps->tps_queue_size : -1;
 }
 
 /* taskprocessor name accessor */
-const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
+const char *ast_taskproc_name(struct ast_taskproc *tps)
 {
 	if (!tps) {
 		ast_log(LOG_ERROR, "no taskprocessor specified!\n");
@@ -404,9 +413,9 @@
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
  * create the taskprocessor if we were told via ast_tps_options to return a reference only 
  * if it already exists */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create)
-{
-	struct ast_taskprocessor *p, tmp_tps = {
+struct ast_taskproc *ast_taskproc_get(char *name, enum ast_tps_options create)
+{
+	struct ast_taskproc *p, tmp_tps = {
 		.name = name,
 	};
 		
@@ -426,7 +435,7 @@
 		return NULL;
 	}
 	/* create a new taskprocessor */
-	if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
+	if (!(p = ao2_alloc(sizeof(*p), tps_taskproc_destroy))) {
 		ao2_unlock(tps_singletons);
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
 		return NULL;
@@ -465,7 +474,7 @@
 }
 
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
-void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
+void *ast_taskproc_unref(struct ast_taskproc *tps)
 {
 	if (tps) {
 		ao2_lock(tps_singletons);
@@ -479,7 +488,7 @@
 }
 
 /* push the task into the taskprocessor queue */	
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+int ast_taskproc_push(struct ast_taskproc *tps, int (*task_exe)(void *datap), void *datap)
 {
 	struct tps_task *t;
 
@@ -499,3 +508,104 @@
 	return 0;
 }
 
+struct ast_taskproc_pool *ast_taskproc_pool_create(const char *name_prefix)
+{
+	struct ast_taskproc_pool *p;
+
+	if (!(p = ast_calloc(1, sizeof(*p) * strlen(name_prefix)))) {
+		return NULL;
+	}
+
+	if (!(p->tps = ao2_container_alloc(1, tps_hash_cb, tps_cmp_cb))) {
+		ast_free(p);
+		return NULL;
+	}
+
+	p->iter = ao2_iterator_init(p->tps, 0);
+
+	strcpy(p->name_prefix, name_prefix);
+
+	return p;
+}
+
+static int cb_true(void *obj, void *arg, int flags)
+{
+	return CMP_MATCH;
+}
+
+unsigned int ast_taskproc_pool_set_size(struct ast_taskproc_pool *p, 
+		unsigned int size)
+{
+	ao2_lock(p->tps);
+
+	/* Handle downsizing */
+
+	while (ao2_container_count(p->tps) > size) {
+		struct ast_taskproc *tp;
+
+		if (!(tp = ao2_callback(p->tps, OBJ_UNLINK, cb_true, NULL))) {
+			ast_log(LOG_ERROR, "taskprocessor unlink fail!!11\n");
+			continue;
+		}
+
+		ast_taskproc_unref(tp);
+	}
+
+	/* Now handle an increase in size */
+
+	while (ao2_container_count(p->tps) < size) {
+		struct ast_taskproc *tp;
+		char buf[128];
+
+		/* Create a unique name for this thingabobber */
+
+		snprintf(buf, sizeof(buf), "%s-%lu", p->name_prefix, ast_random());
+
+		if (!(tp = ast_taskproc_get(buf, TPS_REF_DEFAULT))) {
+			break;
+		}
+
+		ao2_link(p->tps, tp);
+
+		ast_taskproc_unref(tp);
+	}
+
+	ao2_unlock(p->tps);
+
+	return ao2_container_count(p->tps);
+}
+
+struct ast_taskproc_pool *ast_taskproc_pool_destroy(struct ast_taskproc_pool *p)
+{
+	ao2_ref(p->tps, -1);
+	p->tps = NULL;
+	
+	ast_free(p);
+
+	return NULL;
+}
+
+int ast_taskproc_pool_push(struct ast_taskproc_pool *p, ast_taskproc_cb cb,
+		void *data)
+{
+	struct ast_taskproc *tp;
+
+	ao2_lock(p->tps);
+
+	if (!(tp = ao2_iterator_next(&p->iter))) {
+		p->iter = ao2_iterator_init(p->tps, 0);
+		if (!(tp = ao2_iterator_next(&p->iter))) {
+			ast_log(LOG_ERROR, "Failed to push task into pool\n");
+			ao2_unlock(p->tps);
+			return -1;
+		}
+	}
+
+	ast_taskproc_push(tp, cb, data);
+
+	ao2_unlock(p->tps);
+
+	tp = ast_taskproc_unref(tp);
+
+	return 0;
+}




More information about the asterisk-commits mailing list