[svn-commits] dhubbard: branch group/taskprocessors r111953 - in /team/group/taskprocessors...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sat Mar 29 15:23:21 CDT 2008


Author: dhubbard
Date: Sat Mar 29 15:23:20 2008
New Revision: 111953

URL: http://svn.digium.com/view/asterisk?view=rev&rev=111953
Log:
the taskproducer is really dead this time.  There was much rejoicing

Modified:
    team/group/taskprocessors/apps/app_queue.c
    team/group/taskprocessors/apps/app_voicemail.c
    team/group/taskprocessors/channels/chan_sip.c
    team/group/taskprocessors/include/asterisk/taskprocessor.h
    team/group/taskprocessors/main/pbx.c
    team/group/taskprocessors/main/taskprocessor.c

Modified: team/group/taskprocessors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_queue.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/apps/app_queue.c (original)
+++ team/group/taskprocessors/apps/app_queue.c Sat Mar 29 15:23:20 2008
@@ -132,7 +132,7 @@
 	{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
 };
 
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
 
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
@@ -801,8 +801,8 @@
 	sc->state = state;
 	strcpy(sc->dev, device);
 	t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
-	if (taskproducer->queue_task(taskproducer, t) < 0) {
-		ast_log(LOG_WARNING, "queue_task failed!!\n");
+	if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+		ast_log(LOG_WARNING, "failed to push a device state change to the app_queue taskprocessor!!\n");
 		ast_task_free(t);
 	}
 }
@@ -6168,7 +6168,7 @@
 		queue_unref(q);
 	}
 	ao2_ref(queues, -1);
-	ao2_ref(taskproducer, -1);
+	ao2_ref(taskprocessor, -1);
 	return res;
 }
 
@@ -6214,7 +6214,7 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	taskproducer = ast_taskproducer_alloc("app_queue");
+	taskprocessor = ast_taskprocessor_reference("app_queue", 0);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
 		res = -1;

Modified: team/group/taskprocessors/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_voicemail.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/apps/app_voicemail.c (original)
+++ team/group/taskprocessors/apps/app_voicemail.c Sat Mar 29 15:23:20 2008
@@ -623,7 +623,7 @@
 	uint32_t uniqueid;
 };
 
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
 
 static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
 
@@ -8172,14 +8172,14 @@
 	if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
 		return;
 
-	if (!taskproducer) {
-		ast_log(LOG_WARNING, "taskproducer is still NULL\n");
+	if (!taskprocessor) {
+		ast_log(LOG_WARNING, "taskprocessor is still NULL\n");
 	}
 	u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	*uniqueid = u;
 	t = ast_task_alloc(handle_unsubscribe, uniqueid, "app_voicemail, mwi_unsub_event_cb");
-	if (taskproducer->queue_task(taskproducer, t) < 0) {
-		ast_log(LOG_ERROR, "failed to queue task\n");
+	if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+		ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
 		ast_task_free(t);
 	}
 }
@@ -8203,8 +8203,8 @@
 	mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
 	
 	t = ast_task_alloc(handle_subscribe, mwist, "app_voicemail, mwi_sub_event_cb");
-	if (taskproducer->queue_task(taskproducer, t) < 0) {
-		ast_log(LOG_ERROR, "failed to queue task\n");
+	if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+		ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
 		ast_task_free(t);
 	}
 		
@@ -9011,7 +9011,7 @@
 	if (poll_thread != AST_PTHREADT_NULL)
 		stop_poll_thread();
 
-	ao2_ref(taskproducer, -1);
+	ao2_ref(taskprocessor, -1);
 	return res;
 }
 
@@ -9025,7 +9025,7 @@
 	snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
 	
 	/* this should probably go into load_config() */
-	taskproducer = ast_taskproducer_alloc("app_voicemail");
+	taskprocessor = ast_taskprocessor_reference("app_voicemail", 0);
 
 	if ((res = load_config(0)))
 		return res;

Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Sat Mar 29 15:23:20 2008
@@ -1305,7 +1305,7 @@
 	struct sip_st_dlg *stimer;		/*!< SIP Session-Timers */              
 };
 
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
 struct sip_handle_request_task {
 	struct sip_request req;
 	struct sockaddr_in sin;
@@ -18122,8 +18122,10 @@
 	taskdata->req.socket.lock = NULL;
 
 	task = ast_task_alloc(handle_request_do, taskdata, "sipsock_read");
-	taskproducer->queue_task(taskproducer, task);
-
+	if (ast_taskprocessor_push(taskprocessor, task) < 0) {
+		ast_log(LOG_WARNING, "failed to push a SIP message to \'%s\'\n", ast_strdup(taskprocessor->_name));
+		ast_task_free(task);
+	}
 	return 1;
 }
 
@@ -21487,7 +21489,7 @@
 	ast_verbose("SIP channel loading...\n");
 
 	/* this is not the best place to put this */
-	taskproducer = ast_taskproducer_alloc("sipsock");
+	taskprocessor = ast_taskprocessor_reference("sipsock", 0);
 
 	ASTOBJ_CONTAINER_INIT(&userl);	/* User object list */
 	ASTOBJ_CONTAINER_INIT(&peerl);	/* Peer object list */
@@ -21566,7 +21568,7 @@
 	struct sip_threadinfo *th;
 	struct ast_context *con;
 
-	ao2_ref(taskproducer, -1);
+	ao2_ref(taskprocessor, -1);
 
 	/* First, take us out of the channel type list */
 	ast_channel_unregister(&sip_tech);

Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Sat Mar 29 15:23:20 2008
@@ -43,11 +43,6 @@
  * is responsible for processing the task data and freeing task data resources, if necessary
  * and then setting the task datap pointer to NULL before returning.  The taskprocessor will 
  * free the task structure after the task callback returns control to the taskprocessor function.
- *
- * A taskproducer structure is a convenience object that can be embedded inside another object
- * and will provide the basic services for queuing tasks into a referenced taskprocessor.
- * The taskproducer objects utilize astobj2 and will destroy themselves when their reference
- * count reaches zero.
  */
 
 /*! \brief a_task structure is queued to a taskprocessor and released
@@ -103,17 +98,6 @@
 	AST_LIST_ENTRY(taskprocessor_singleton_info) list;
 };
 
-/*! \brief A taskproducer structure is for convenience and wraps a taskprocessor
- * and a queue_task operation. */
-struct ast_taskproducer {
-	/*! \brief The taskprocessor that this taskproducer queues tasks to */ 
-	struct taskprocessor_singleton_info *_taskprocessor;
-	/*! \brief Counter of the number of tasks queued by this taskproducer */
-	unsigned long _tasks_produced;
-	/*! \brief The default queue_task() function */
-	int (*queue_task)(struct ast_taskproducer *producer, struct a_task *task);
-}; 
-
 /*! \brief Initialize the taskprocessor subsystem */
 int ast_tps_init(void);
 
@@ -130,8 +114,5 @@
 struct a_task *ast_taskprocessor_pop(struct taskprocessor_singleton_info *tp);
 /*! \brief Return the number of a_task elements currently in the taskprocessor queue */
 int ast_taskprocessor_depth(struct taskprocessor_singleton_info *tp);
-
-/*! \brief Allocate and initialize a taskproducer convenience object */
-struct ast_taskproducer *ast_taskproducer_alloc(const char *name);
 #endif
 

Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Sat Mar 29 15:23:20 2008
@@ -125,7 +125,7 @@
 struct ast_context;
 struct ast_app;
 
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
 
 AST_THREADSTORAGE(switch_data);
 
@@ -7813,8 +7813,8 @@
 		return;
 	strcpy(sc->dev, device);
 	t = ast_task_alloc(handle_statechange, sc, "pbx-device_state_cb");
-	if (taskproducer->queue_task(taskproducer, t) < 0) {
-		ast_log(LOG_WARNING, "queue_task failed!\n");
+	if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+		ast_log(LOG_WARNING, "failed to push a device state change notification to \'%s\'\n", ast_strdup(taskprocessor->_name));
 		ast_task_free(t);
 	}
 }
@@ -7842,7 +7842,7 @@
 	/* Register manager application */
 	ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
 
-	taskproducer = ast_taskproducer_alloc("pbx-core");
+	taskprocessor = ast_taskprocessor_reference("pbx-core", 0);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
 			AST_EVENT_IE_END))) {

Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Sat Mar 29 15:23:20 2008
@@ -43,7 +43,6 @@
 static int tps_taskprocessor_add(struct taskprocessor_singleton_info *t);
 static int tps_taskprocessor_count(void);
 static void tps_taskprocessor_destroy(void *tps);
-static void tps_taskproducer_destroy(void *tp);
 static int tps_taskprocessor_ping_handler(struct a_task* e);
 
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -181,18 +180,13 @@
 		ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
 		return CLI_SUCCESS;
 	}
-
+	ast_cli(a->fd, "\npinging %s ...", name);
 	t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_taskprocessor_ping");
 	begin = ast_tvnow();
 	if (ast_taskprocessor_push(tps, t) < 0) {
 		ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
 		ast_task_free(t);
 	}
-	ast_cli(a->fd, "\npinging %s ...", name);
-	ast_mutex_lock(&tps->_taskprocessor_lock);
-	ast_cond_signal(&tps->_poll_cond);
-	ast_mutex_unlock(&tps->_taskprocessor_lock);
-	
 	ast_mutex_lock(&_cli_ping_cond_lock);
 	ast_cond_wait(&_cli_ping_cond, &_cli_ping_cond_lock);
 	ast_mutex_unlock(&_cli_ping_cond_lock);
@@ -502,6 +496,9 @@
 	tps->_queue_size += 1;
 	ast_mutex_unlock(&tps->_taskprocessor_lock);
 	AST_LIST_UNLOCK(&tps->_queue);
+	ast_mutex_lock(&tps->_taskprocessor_lock);
+	ast_cond_signal(&tps->_poll_cond);
+	ast_mutex_unlock(&tps->_taskprocessor_lock);
 	return 0;
 }
 
@@ -560,52 +557,3 @@
 	return size;
 }
 
-/*! \brief Provide a default implementation of a queue() helper function for a taskprocessor
- * \param producer wrapper around a taskprocessor and a queue() function
- * \return 0 on success, -1 on error
- */
-static int tps_default_queue_task(struct ast_taskproducer *producer, struct a_task *task)
-{
-	if (!((producer)&&(producer->_taskprocessor)) || !task) {
-		ast_log(LOG_ERROR, "this function requires a proper \'%s\' structure.\n", (task)?"producer":"task");
-		return -1;
-	}
-	if (ast_taskprocessor_push(producer->_taskprocessor, task) < 0) {
-		ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", producer->_taskprocessor->_name);
-		return -1;
-	}
-	producer->_tasks_produced++;
-	ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
-	ast_cond_signal(&producer->_taskprocessor->_poll_cond);
-	ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
-	return 0;
-}
-
-/*! \brief Allocate and initialize a taskproducer structure with a default queue_task helper function
- * \param name of taskprocessor to reference
- * \return taskproducer on success, NULL on error
- */
-struct ast_taskproducer *ast_taskproducer_alloc(const char *name)
-{
-	struct ast_taskproducer *p;
-	p = ao2_alloc(sizeof(*p), tps_taskproducer_destroy);
-	if (!p) {
-		ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
-		return NULL;
-	}
-	p->_taskprocessor = ast_taskprocessor_reference(name, 0);
-	p->queue_task = tps_default_queue_task;
-	return p;
-}
-
-/*! \brief Free taskproducer resources
- * \param tp taskproducer
- * \return void
- */
-static void tps_taskproducer_destroy(void *tp)
-{
-	struct ast_taskproducer *p;
-	p = (struct ast_taskproducer *)tp;
-	ao2_ref(p->_taskprocessor, -1);
-}
-




More information about the svn-commits mailing list