[asterisk-commits] dhubbard: branch group/taskprocessors r115192 - in /team/group/taskprocessors...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri May 2 01:23:02 CDT 2008


Author: dhubbard
Date: Fri May  2 01:23:01 2008
New Revision: 115192

URL: http://svn.digium.com/view/asterisk?view=rev&rev=115192
Log:
convert ast_task to tps_task
merge ast_task_alloc and ast_taskprocessor_push interfaces
ast_task_alloc, ast_task_free, ast_taskprocessor_depth, ast_taskprocessor_pop are now static and converted to tps_ functions
remove custom function specification option from ast_taskprocessor_get


Modified:
    team/group/taskprocessors/apps/app_queue.c
    team/group/taskprocessors/apps/app_voicemail.c
    team/group/taskprocessors/channels/chan_sip.c
    team/group/taskprocessors/include/asterisk/taskprocessor.h
    team/group/taskprocessors/main/pbx.c
    team/group/taskprocessors/main/taskprocessor.c

Modified: team/group/taskprocessors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_queue.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/apps/app_queue.c (original)
+++ team/group/taskprocessors/apps/app_queue.c Fri May  2 01:23:01 2008
@@ -132,7 +132,7 @@
 	{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
 };
 
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *devicestate_tps;
 
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
@@ -791,7 +791,6 @@
 	enum ast_device_state state;
 	const char *device;
 	struct statechange *sc;
-	void *task;
 	size_t datapsize;
 
 	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
@@ -808,11 +807,8 @@
 	}
 	sc->state = state;
 	strcpy(sc->dev, device);
-	if (!(task = ast_task_alloc(handle_statechange, sc))) {
-		ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
-	} else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
-		ast_log(LOG_WARNING, "failed to push a device state change to the app_queue taskprocessor!!\n");
-		ast_task_free(task);
+	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
+		ast_free(sc);
 	}
 }
 
@@ -6237,7 +6233,7 @@
 		queue_unref(q);
 	}
 	ao2_ref(queues, -1);
-	taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+	devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
 	return res;
 }
 
@@ -6283,7 +6279,9 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	taskprocessor = ast_taskprocessor_get("app_queue", 0, 0);
+	if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
+		ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
+	}
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
 		res = -1;

Modified: team/group/taskprocessors/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_voicemail.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/apps/app_voicemail.c (original)
+++ team/group/taskprocessors/apps/app_voicemail.c Fri May  2 01:23:01 2008
@@ -629,7 +629,7 @@
 	uint32_t uniqueid;
 };
 
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *mwi_subscription_tps;
 
 static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
 
@@ -8782,7 +8782,6 @@
 
 static void mwi_unsub_event_cb(const struct ast_event *event, void *userdata)
 {
-	struct ast_task *t;
 	uint32_t u, *uniqueid = ast_calloc(1, sizeof(*uniqueid));
 	if (ast_event_get_type(event) != AST_EVENT_UNSUB)
 		return;
@@ -8790,23 +8789,17 @@
 	if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
 		return;
 
-	if (!taskprocessor) {
-		ast_log(LOG_WARNING, "taskprocessor is still NULL\n");
-	}
 	u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	*uniqueid = u;
-	if (!(t = ast_task_alloc(handle_unsubscribe, uniqueid))) {
-		ast_log(LOG_WARNING, "failed to allocate a task for a mwi unsubcribe event\n");
-	} else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
-		ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to taskprocessor\n");
-		ast_task_free(t);
+	if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
+		ast_free(uniqueid);
 	}
 }
 
 static void mwi_sub_event_cb(const struct ast_event *event, void *userdata)
 {
 	struct mwi_sub_task *mwist;
-	struct ast_task *t;
+	
 	if (ast_event_get_type(event) != AST_EVENT_SUB)
 		return;
 
@@ -8821,11 +8814,8 @@
 	mwist->context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
 	mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	
-	if (!(t = ast_task_alloc(handle_subscribe, mwist))) {
-		ast_log(LOG_WARNING, "failed to allocate a task for a mwi subscribe event\n");
-	} else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
-		ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to taskprocessor\n");
-		ast_task_free(t);
+	if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+		ast_free(mwist);
 	}
 }
 
@@ -9649,7 +9639,7 @@
 	if (poll_thread != AST_PTHREADT_NULL)
 		stop_poll_thread();
 
-	taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+	mwi_subscription_tps = ast_taskprocessor_unreference(mwi_subscription_tps);
 	return res;
 }
 
@@ -9662,8 +9652,9 @@
 	/* compute the location of the voicemail spool directory */
 	snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
 	
-	/* this should probably go into load_config() */
-	taskprocessor = ast_taskprocessor_get("app_voicemail", 0, 0);
+	if (!(mwi_subscription_tps = ast_taskprocessor_get("app_voicemail", 0))) {
+		ast_log(LOG_WARNING, "failed to reference mwi subscription taskprocessor.  MWI will not work\n");
+	}
 
 	if ((res = load_config(0)))
 		return res;

Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Fri May  2 01:23:01 2008
@@ -1321,7 +1321,7 @@
 	struct sip_st_dlg *stimer;		/*!< SIP Session-Timers */              
 };
 
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *sipsock_tps;
 struct sip_handle_request_task {
 	struct sip_request req;
 	struct sockaddr_in sin;
@@ -19114,7 +19114,6 @@
 static int sipsock_read(int *id, int fd, short events, void *ignore)
 {
 	struct sip_handle_request_task *taskdata;
-	void *task;
 	int res;
 	socklen_t len;
 	static char readbuf[65535];
@@ -19150,12 +19149,10 @@
 	taskdata->req.socket.port = bindaddr.sin_port;
 	taskdata->req.socket.lock = NULL;
 
-	if (!(task = ast_task_alloc(handle_request_do, taskdata))) {
-		ast_log(LOG_WARNING, "failed to allocate a task for SIP message\n");
-	} else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
-		ast_log(LOG_WARNING, "failed to push a SIP message to taskprocessor\n");
-		ast_task_free(task);
-	}
+	if (ast_taskprocessor_push(sipsock_tps, handle_request_do, taskdata) < 0) {
+		ast_free(taskdata);
+	}
+		
 	return 1;
 }
 
@@ -22605,7 +22602,9 @@
 	
 	ASTOBJ_CONTAINER_INIT(&regl); /* Registry object list -- not searched for anything */
 	/* this is not the best place to put this */
-	taskprocessor = ast_taskprocessor_get("sipsock", 0, 0);
+	if (!(sipsock_tps = ast_taskprocessor_get("sipsock", 0))) {
+		ast_log(LOG_WARNING, "failed to create sipsock taskprocessor.  SIP is not going to work\n");
+	}
 	/* the fact that ao2_containers can't resize automatically is a major worry! */
     /* if the number of objects gets above MAX_XXX_BUCKETS, things will slow down */
 	users = ao2_t_container_alloc(hash_user_size, user_hash_cb, user_cmp_cb, "allocate users");
@@ -22692,7 +22691,7 @@
 	struct ao2_iterator i;
 
 	ast_sched_dump(sched);
-	taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+	sipsock_tps = ast_taskprocessor_unreference(sipsock_tps);
 
 	/* First, take us out of the channel type list */
 	ast_channel_unregister(&sip_tech);

Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Fri May  2 01:23:01 2008
@@ -30,21 +30,21 @@
  *
  * \author Dwayne M. Hubbard <dhubbard at digium.com>
  *
- * \note A taskprocessor is a singleton processing thread that can be referenced
- * with a friendly name and shared across modules.  Modules that reference a taskprocessor
- * can push ast_task objects into the default taskprocessor FIFO container.  The default
- * taskprocessing function can be overridden at the time of taskprocessor creation using
- * the ast_taskprocessor_get() function.  Taskprocessors utilize astobj2 and will
- * be created upon the first reference to a taskprocessor name, as well as destroy
- * themselves when the taskprocessor reference count reaches zero.
+ * \note A taskprocessor is a named singleton containing a processing thread and
+ * a task queue that serializes tasks pushed into it by one or more independent modules.  
+ * A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get()
+ * function and destroyed when the taskprocessor reference count reaches zero.
  *
- * The ast_task structure has a callback function, called execute(), that is called by the
- * taskprocessor when the task is popped off the queue.  The execute() callback handler
- * is responsible for processing the task data and freeing task data resources, if necessary
- * and then setting the task datap pointer to NULL before returning.  The taskprocessor will 
- * free the task structure after the task callback returns control to the taskprocessor function.
+ * Modules that obtain a reference to a taskprocessor can queue tasks into the taskprocessor
+ * where tasks are processed by the singleton processing thread when the task reaches the front 
+ * of the queue.  A task is a wrapper around a task-handling function pointer and a data
+ * pointer.  It is the responsibility of the task handling function to free memory allocated for
+ * the task data pointer.  A task is obtained using the ast_task_alloc() and freed by the 
+ * taskprocessor after the task has been processed.  A module releases its reference to a
+ * taskprocessor using the ast_taskprocessor_unreference() function.  Tasks waiting to be
+ * processed in the taskprocessor queue when the taskprocessor reference count reaches zero
+ * will be purged and released from the taskprocessor queue without being processed.
  */
-struct ast_task;
 struct ast_taskprocessor;
 
 /*! \brief ast_tps_options is used to specify whether a taskprocessor should be created
@@ -58,37 +58,16 @@
 	TPS_REF_IF_EXISTS = (1 << 0),
 };
 
-/*! \brief Initialize the taskprocessor subsystem */
-int ast_tps_init(void);
-
-/*! \brief Allocate and initialize a task structure.
- * \param task_exe The task handling function that will be called when this task is popped
- * off the taskprocessor queue and executed.
- * \param datap The data to pass to the task_exe function when the task is executed
- * \param src The calling function name (this will become DEBUG only)
- * \return A task structure ready to be queued into a taskprocessor, NULL on error */
-struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap);
-
-/*! \brief Free the task resources
- * \note This function does not free any memory pointed to by the data pointer.  It is
- * the responsibility of the task handling function to free any resources managed by
- * a task's data pointer.
- * \param task The task to free
- * \return zero */
-int ast_task_free(struct ast_task *task);
-
 /*! \brief Get a reference to a taskprocessor with the specified name; create a taskprocessor
- * if one with the specified name does not already exist.  The default processing function
- * can be overridden using the second argument, custom_func, at the time of taskprocessor creation.
+ * if one with the specified name does not already exist.  
  * The default behavior of instantiating a taskprocessor if one does not already exist can be
  * disabled by specifying the TPS_REF_IF_EXISTS reference type as the third argument to ast_taskprocessor_get().
  * \param name The name of the taskprocessor
- * \param custom_func Use 0 by default or specify a custom taskprocessor thread function
  * \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does 
  * not already exist
  * return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the
  * TPS_REF_IF_EXISTS reference type is specified */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create);
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create);
 
 /*! \brief Unreference the specified taskprocessor and its reference count will decrement.
  * taskprocessors use astobj2 and will destroy themselves when their reference count reaches zero.
@@ -100,18 +79,7 @@
  * \param tps The taskprocessor structure
  * \param t The task to push into the taskprocessor queue
  * \return zero on success, -1 on failure */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t);
-
-/*! \brief If a task is in the taskprocessor queue it will be popped from the queue
- * and its address will be returned.  This function returns NULL if the queue is empty.
- * \param tps The taskprocessor structure
- * \return A task pointer if the queue is not empty, NULL when the queue is empty. */
-struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps);
-
-/*! \brief Return the number of tasks waiting in the taskprocessor queue
- * \param tps taskprocessor to return queue depth
- * \return depth on success, -1 on error */
-int ast_taskprocessor_depth(struct ast_taskprocessor *tps);
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
 /*! \brief Return the name of the taskprocessor singleton */
 char * ast_taskprocessor_name(struct ast_taskprocessor *tps);

Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Fri May  2 01:23:01 2008
@@ -124,7 +124,7 @@
 struct ast_context;
 struct ast_app;
 
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *device_state_tps;
 
 AST_THREADSTORAGE(switch_data);
 
@@ -8044,7 +8044,6 @@
 {
 	const char *device;
 	struct statechange *sc;
-	void *task;
 
 	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
 	if (ast_strlen_zero(device)) {
@@ -8055,11 +8054,8 @@
 	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
 		return;
 	strcpy(sc->dev, device);
-	if (!(task = ast_task_alloc(handle_statechange, sc))) {
-		ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
-	} else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
-		ast_log(LOG_WARNING, "failed to push a device state change notification to taskprocessor\n");
-		ast_task_free(task);
+	if (ast_taskprocessor_push(device_state_tps, handle_statechange, sc) < 0) {
+		ast_free(sc);
 	}
 }
 
@@ -8069,7 +8065,9 @@
 
 	/* Initialize the PBX */
 	ast_verb(1, "Asterisk PBX Core Initializing\n");
-	taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
+	if (!(device_state_tps = ast_taskprocessor_get("pbx-core", 0))) {
+		ast_log(LOG_WARNING, "failed to create pbx-core taskprocessor\n");
+	}
 
 	ast_verb(1, "Registering builtin applications:\n");
 	ast_cli_register_multiple(pbx_cli, sizeof(pbx_cli) / sizeof(struct ast_cli_entry));

Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Fri May  2 01:23:01 2008
@@ -24,6 +24,7 @@
  */
 
 #include "asterisk.h"
+#include "asterisk/_private.h"
 #include "asterisk/module.h"
 #include "asterisk/time.h"
 #include "asterisk/astobj2.h"
@@ -34,22 +35,22 @@
 
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
-/*! \brief ast_task structure is queued to a taskprocessor and released
+/*! \brief tps_task structure is queued to a taskprocessor and released
  * by the taskprocessing thread.  The callback function that is assigned
  * to the execute() function pointer is responsible for releasing
  * datap resources if necessary. */
-struct ast_task {
+struct tps_task {
 	/*! \brief The execute() task callback function pointer */
 	int (*execute)(void *datap);
 	/*! \brief The data pointer for the task execute() function */
 	void *datap;
 	/*! \brief AST_LIST_ENTRY overhead */
-	AST_LIST_ENTRY(ast_task) list;
+	AST_LIST_ENTRY(tps_task) list;
 };
 
-/*! \brief ast_taskprocessor_stats are technically optional, but
+/*! \brief tps_taskprocessor_stats are technically optional, but
  * used by default to keep track of statistics for an individual taskprocessor */
-struct ast_taskprocessor_stats {
+struct tps_taskprocessor_stats {
 	/*! \brief This is the maximum number of tasks queued at any one time */
 	unsigned long max_qsize;
 	/*! \brief This is the current number of tasks processed */
@@ -69,14 +70,12 @@
 	ast_mutex_t taskprocessor_lock;
 	/*! \brief Taskprocesor thread run flag */
 	unsigned char poll_thread_run;
-	/*! \brief Hold a pointer to the taskprocessing function */
-	void *(*poll_function)(void*);
 	/*! \brief Taskprocessor statistics */
-	struct ast_taskprocessor_stats *stats;
+	struct tps_taskprocessor_stats *stats;
 	/*! \brief Taskprocessor current queue size */
 	long queue_size;
 	/*! \brief Taskprocessor queue */
-	AST_LIST_HEAD_NOLOCK(queue, ast_task) queue;
+	AST_LIST_HEAD_NOLOCK(queue, tps_task) queue;
 	/*! \brief Taskprocessor singleton list entry */
 	AST_LIST_ENTRY(ast_taskprocessor) list;
 };
@@ -91,9 +90,10 @@
 static int tps_hash_cb(const void *obj, const int flags);
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 static void *tps_default_processor_function(void *data);
-static struct ast_taskprocessor *tps_default_constructor(void);
 static void tps_taskprocessor_destroy(void *tps);
-static int tps_taskprocessor_ping_handler(void *datap);
+static int tps_ping_handler(void *datap);
+static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
+static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
 
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -103,25 +103,19 @@
 	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
 };
 
-/*
- * Taskprocessor Subsystem
- */
 int ast_tps_init(void)
 {
-	tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb);
-	if (!tps_singletons)
-		return AST_MODULE_LOAD_FAILURE;
-
+	if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
+		ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
+		return -1;
+	}
 	ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
 	return 0;
 }
 
-/*
- * Task allocation & free
- */
-struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap)
-{
-	struct ast_task *t;
+static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
+{
+	struct tps_task *t;
 	if ((t = ast_calloc(1, sizeof(*t)))) {
 		t->execute = task_exe;
 		t->datap = datap;
@@ -129,12 +123,12 @@
 	return t;
 }
 	
-int ast_task_free(struct ast_task *task)
+static void *tps_task_free(struct tps_task *task)
 {
 	if (task) {
 		ast_free(task);
 	}
-	return 0;
+	return NULL;
 }
 
 /*
@@ -165,7 +159,7 @@
 	return name;
 }
 
-static int tps_taskprocessor_ping_handler(void *datap)
+static int tps_ping_handler(void *datap)
 {
 	ast_mutex_lock(&cli_ping_cond_lock);
 	ast_cond_signal(&cli_ping_cond);
@@ -179,7 +173,6 @@
 	char *name;
 	struct timeval tv;
 	struct timespec ts;
-	struct ast_task *t = NULL;
 	struct ast_taskprocessor *tps = NULL;
 
 	switch (cmd) {
@@ -197,23 +190,17 @@
 		return CLI_SHOWUSAGE;
 
 	name = a->argv[2];
-	if (!(tps = ast_taskprocessor_get(name, 0, TPS_REF_IF_EXISTS))) {
+	if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
 		ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
 		return CLI_SUCCESS;
 	}
 	ast_cli(a->fd, "\npinging %s ...", name);
-	if (!(t = ast_task_alloc(tps_taskprocessor_ping_handler, 0))) {
-		ast_cli(a->fd, "\n\tfailed to allocate a task\n\n");
-		ao2_ref(tps, -1);
-		return CLI_FAILURE;
-	}
 	tv = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
 	ts.tv_sec = tv.tv_sec;
 	ts.tv_nsec = tv.tv_usec * 1000;
 	ast_mutex_lock(&cli_ping_cond_lock);
-	if (ast_taskprocessor_push(tps, t) < 0) {
+	if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
 		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
-		ast_task_free(t);
 		ao2_ref(tps, -1);
 		return CLI_FAILURE;
 	}
@@ -272,27 +259,26 @@
  */
 static void *tps_default_processor_function(void *data)
 {
-	struct ast_taskprocessor *i;
-	struct ast_task *t;
+	struct ast_taskprocessor *i = data;
+	struct tps_task *t;
 	int size;
 
-	i = (struct ast_taskprocessor*)data;
 	if (!i) {
 		ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
 		return NULL;
 	}
 
 	while (i->poll_thread_run) {
-		if ((size = ast_taskprocessor_depth(i)) > 0) {
+		if ((size = tps_taskprocessor_depth(i)) > 0) {
 			/* stuff is in the queue */
-			t = ast_taskprocessor_pop(i);
+			t = tps_taskprocessor_pop(i);
 			if (!t) {
 				ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
 				continue;
 			}
 			if (!t->execute) {
 				ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
-				ast_task_free(t);
+				tps_task_free(t);
 				continue;
 			}
 			t->execute(t->datap);
@@ -304,7 +290,7 @@
 				}
 			}
 			ast_mutex_unlock(&i->taskprocessor_lock);
-			ast_task_free(t);
+			tps_task_free(t);
 			if (--size) 
 				continue;
 		}
@@ -313,24 +299,21 @@
 			ast_mutex_unlock(&i->taskprocessor_lock);
 			break;
 		}
-		if (!ast_taskprocessor_depth(i)) 
+		if (!tps_taskprocessor_depth(i)) 
 			ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
 		ast_mutex_unlock(&i->taskprocessor_lock);
 	}
-	while (ast_taskprocessor_depth(i)) {
+	while (tps_taskprocessor_depth(i)) {
 		/* stuff is in the queue */
-		t = ast_taskprocessor_pop(i);
+		t = tps_taskprocessor_pop(i);
 		if (t) {
-			ast_task_free(t);
+			tps_task_free(t);
 			t = NULL;
 		}
 	}
 	return NULL;
 }
 
-/*
- * hashing and comparison 
- */
 static int tps_hash_cb(const void *obj, const int flags)
 {
 	const struct ast_taskprocessor *tps = obj;
@@ -343,25 +326,6 @@
 	struct ast_taskprocessor *lhs = obj, *rhs = arg;
 
 	return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH : 0;
-}
-
-/*
- * Taskprocessor construction and destruction
- */
-static struct ast_taskprocessor *tps_default_constructor(void)
-{
-	struct ast_taskprocessor *tps;
-	
-	if (!(tps = ao2_alloc(sizeof(*tps), tps_taskprocessor_destroy))) {
-		return NULL;
-	}
-	ast_cond_init(&tps->poll_cond, NULL);
-	tps->poll_thread = AST_PTHREADT_NULL;
-	tps->stats = ast_calloc(1, sizeof(*tps->stats));
-	if (!tps->stats) {
-		return ast_taskprocessor_unreference(tps);
-	}
-	return tps;
 }
 
 static void tps_taskprocessor_destroy(void *tps)
@@ -372,7 +336,7 @@
 		ast_log(LOG_ERROR, "missing taskprocessor\n");
 		return;
 	}
-	ast_log(LOG_DEBUG, "destroying taskprocessor \'%s\'\n", t->name);
+	ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
 	/* kill it */	
 	ast_mutex_lock(&t->taskprocessor_lock);
 	t->poll_thread_run = 0;
@@ -389,10 +353,32 @@
 	return;
 }
 
+static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
+{
+	struct tps_task *task;
+
+	if (!tps) {
+		ast_log(LOG_ERROR, "missing taskprocessor\n");
+		return NULL;
+	}
+	ast_mutex_lock(&tps->taskprocessor_lock);
+	if ((task = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
+		tps->queue_size--;
+	}
+	ast_mutex_unlock(&tps->taskprocessor_lock);
+	return task;
+}
+
+static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+{
+	return (tps) ? tps->queue_size:-1;
+}
+
+
 /*
  * Taskprocessor service functions
  */
-char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
+char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
 {
 	if (!tps) {
 		ast_log(LOG_ERROR, "no taskprocessor specified!\n");
@@ -401,9 +387,8 @@
 	return tps->name;
 }
 
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create)
-{
-	int rc;
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create)
+{
 	struct ast_taskprocessor *p, tmp_tps = {
 		.name = name,
 	};
@@ -415,52 +400,40 @@
 	ast_mutex_lock(&tps_marshall);
 	p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
 	if (p) {
-		if ((create == TPS_REF_DEFAULT) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
-			ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
-			ao2_ref(p, -1);
-			ast_mutex_unlock(&tps_marshall);
-			return NULL;
-		}
 		ast_mutex_unlock(&tps_marshall);
 		return p;
 	}
-	if ((create & TPS_REF_IF_EXISTS) == TPS_REF_IF_EXISTS) {
+	if (create & TPS_REF_IF_EXISTS) {
 		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
 		ast_mutex_unlock(&tps_marshall);
 		return NULL;
 	}
 	/* create a new taskprocessor */
-	if ((p = tps_default_constructor()) == NULL) {
-		ast_log(LOG_ERROR, "The default taskprocessor object could not be created for reference \'%s\'\n", name);
-		ast_mutex_unlock(&tps_marshall);
+	if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
+		ast_mutex_unlock(&tps_marshall);
+		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
+		return NULL;
+	}
+	if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
+		ast_mutex_unlock(&tps_marshall);
+		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
+		ao2_ref(p, -1);
 		return NULL;
 	}
 	p->name = ast_strdup(name);
 	p->poll_thread_run = 1;
-
-	/* compressing the 'if (custom_func)' blocks below into a single block, using something like:
-	 *
-	 * 		ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
-	 *
-	 * will result in uglier and less useful 'core show threads' output and you won't know if the default
-	 * processing function was used or not. */
-	if (custom_func) {
-		p->poll_function = custom_func;
-		rc = ast_pthread_create(&p->poll_thread, NULL, custom_func, p);
-	} else {
-		p->poll_function = tps_default_processor_function;
-		rc = ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p);
-	}
-	if (rc < 0) {
-		ast_log(LOG_ERROR, "Taskprocessor \'%s\' failed to create the processing thread.\n", p->name);
+	ast_cond_init(&p->poll_cond, NULL);
+	p->poll_thread = AST_PTHREADT_NULL;
+	if (ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p) < 0) {
+		ast_mutex_unlock(&tps_marshall);
+		ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
 		ao2_ref(p, -1);
-		ast_mutex_unlock(&tps_marshall);
 		return NULL;
 	}
 	if (!(ao2_link(tps_singletons, p))) {
-		ast_log(LOG_ERROR, "Failed to add taskprocessor \'%s\' to container\n", p->name);
+		ast_mutex_unlock(&tps_marshall);
+		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
 		ao2_ref(p, -1);
-		ast_mutex_unlock(&tps_marshall);
 		return NULL;
 	}
 	ast_mutex_unlock(&tps_marshall);
@@ -480,10 +453,16 @@
 	return NULL;
 }
 	
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t)
-{
-	if (!tps || !t) {
-		ast_log(LOG_ERROR, "missing \'%s\'\n", (tps)?"task":"taskprocessor");
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+	struct tps_task *t;
+
+	if (!tps || !task_exe) {
+		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback":"taskprocessor");
+		return -1;
+	}
+	if (!(t = tps_task_alloc(task_exe, datap))) {
+		ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
 		return -1;
 	}
 	ast_mutex_lock(&tps->taskprocessor_lock);
@@ -494,29 +473,3 @@
 	return 0;
 }
 
-struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps)
-{
-	struct ast_task *t;
-
-	if (!tps) {
-		ast_log(LOG_ERROR, "missing taskprocessor\n");
-		return NULL;
-	}
-	ast_mutex_lock(&tps->taskprocessor_lock);
-	if ((t = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
-		tps->queue_size--;
-	}
-	ast_mutex_unlock(&tps->taskprocessor_lock);
-	return t;
-}
-
-int ast_taskprocessor_depth(struct ast_taskprocessor *tps)
-{
-	int size = -1;
-
-	if (tps) {
-		size = tps->queue_size;
-	}
-	return size;
-}
-




More information about the asterisk-commits mailing list