[asterisk-commits] dhubbard: branch group/taskprocessors r111953 - in /team/group/taskprocessors...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Mar 29 15:23:21 CDT 2008
Author: dhubbard
Date: Sat Mar 29 15:23:20 2008
New Revision: 111953
URL: http://svn.digium.com/view/asterisk?view=rev&rev=111953
Log:
the taskproducer is really dead this time. There was much rejoicing
Modified:
team/group/taskprocessors/apps/app_queue.c
team/group/taskprocessors/apps/app_voicemail.c
team/group/taskprocessors/channels/chan_sip.c
team/group/taskprocessors/include/asterisk/taskprocessor.h
team/group/taskprocessors/main/pbx.c
team/group/taskprocessors/main/taskprocessor.c
Modified: team/group/taskprocessors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_queue.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/apps/app_queue.c (original)
+++ team/group/taskprocessors/apps/app_queue.c Sat Mar 29 15:23:20 2008
@@ -132,7 +132,7 @@
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
};
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
@@ -801,8 +801,8 @@
sc->state = state;
strcpy(sc->dev, device);
t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
- if (taskproducer->queue_task(taskproducer, t) < 0) {
- ast_log(LOG_WARNING, "queue_task failed!!\n");
+ if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_WARNING, "failed to push a device state change to the app_queue taskprocessor!!\n");
ast_task_free(t);
}
}
@@ -6168,7 +6168,7 @@
queue_unref(q);
}
ao2_ref(queues, -1);
- ao2_ref(taskproducer, -1);
+ ao2_ref(taskprocessor, -1);
return res;
}
@@ -6214,7 +6214,7 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- taskproducer = ast_taskproducer_alloc("app_queue");
+ taskprocessor = ast_taskprocessor_reference("app_queue", 0);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
res = -1;
Modified: team/group/taskprocessors/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_voicemail.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/apps/app_voicemail.c (original)
+++ team/group/taskprocessors/apps/app_voicemail.c Sat Mar 29 15:23:20 2008
@@ -623,7 +623,7 @@
uint32_t uniqueid;
};
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
@@ -8172,14 +8172,14 @@
if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
return;
- if (!taskproducer) {
- ast_log(LOG_WARNING, "taskproducer is still NULL\n");
+ if (!taskprocessor) {
+ ast_log(LOG_WARNING, "taskprocessor is still NULL\n");
}
u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
*uniqueid = u;
t = ast_task_alloc(handle_unsubscribe, uniqueid, "app_voicemail, mwi_unsub_event_cb");
- if (taskproducer->queue_task(taskproducer, t) < 0) {
- ast_log(LOG_ERROR, "failed to queue task\n");
+ if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
ast_task_free(t);
}
}
@@ -8203,8 +8203,8 @@
mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
t = ast_task_alloc(handle_subscribe, mwist, "app_voicemail, mwi_sub_event_cb");
- if (taskproducer->queue_task(taskproducer, t) < 0) {
- ast_log(LOG_ERROR, "failed to queue task\n");
+ if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
ast_task_free(t);
}
@@ -9011,7 +9011,7 @@
if (poll_thread != AST_PTHREADT_NULL)
stop_poll_thread();
- ao2_ref(taskproducer, -1);
+ ao2_ref(taskprocessor, -1);
return res;
}
@@ -9025,7 +9025,7 @@
snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
/* this should probably go into load_config() */
- taskproducer = ast_taskproducer_alloc("app_voicemail");
+ taskprocessor = ast_taskprocessor_reference("app_voicemail", 0);
if ((res = load_config(0)))
return res;
Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Sat Mar 29 15:23:20 2008
@@ -1305,7 +1305,7 @@
struct sip_st_dlg *stimer; /*!< SIP Session-Timers */
};
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
struct sip_handle_request_task {
struct sip_request req;
struct sockaddr_in sin;
@@ -18122,8 +18122,10 @@
taskdata->req.socket.lock = NULL;
task = ast_task_alloc(handle_request_do, taskdata, "sipsock_read");
- taskproducer->queue_task(taskproducer, task);
-
+ if (ast_taskprocessor_push(taskprocessor, task) < 0) {
+ ast_log(LOG_WARNING, "failed to push a SIP message to \'%s\'\n", ast_strdup(taskprocessor->_name));
+ ast_task_free(task);
+ }
return 1;
}
@@ -21487,7 +21489,7 @@
ast_verbose("SIP channel loading...\n");
/* this is not the best place to put this */
- taskproducer = ast_taskproducer_alloc("sipsock");
+ taskprocessor = ast_taskprocessor_reference("sipsock", 0);
ASTOBJ_CONTAINER_INIT(&userl); /* User object list */
ASTOBJ_CONTAINER_INIT(&peerl); /* Peer object list */
@@ -21566,7 +21568,7 @@
struct sip_threadinfo *th;
struct ast_context *con;
- ao2_ref(taskproducer, -1);
+ ao2_ref(taskprocessor, -1);
/* First, take us out of the channel type list */
ast_channel_unregister(&sip_tech);
Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Sat Mar 29 15:23:20 2008
@@ -43,11 +43,6 @@
* is responsible for processing the task data and freeing task data resources, if necessary
* and then setting the task datap pointer to NULL before returning. The taskprocessor will
* free the task structure after the task callback returns control to the taskprocessor function.
- *
- * A taskproducer structure is a convenience object that can be embedded inside another object
- * and will provide the basic services for queuing tasks into a referenced taskprocessor.
- * The taskproducer objects utilize astobj2 and will destroy themselves when their reference
- * count reaches zero.
*/
/*! \brief a_task structure is queued to a taskprocessor and released
@@ -103,17 +98,6 @@
AST_LIST_ENTRY(taskprocessor_singleton_info) list;
};
-/*! \brief A taskproducer structure is for convenience and wraps a taskprocessor
- * and a queue_task operation. */
-struct ast_taskproducer {
- /*! \brief The taskprocessor that this taskproducer queues tasks to */
- struct taskprocessor_singleton_info *_taskprocessor;
- /*! \brief Counter of the number of tasks queued by this taskproducer */
- unsigned long _tasks_produced;
- /*! \brief The default queue_task() function */
- int (*queue_task)(struct ast_taskproducer *producer, struct a_task *task);
-};
-
/*! \brief Initialize the taskprocessor subsystem */
int ast_tps_init(void);
@@ -130,8 +114,5 @@
struct a_task *ast_taskprocessor_pop(struct taskprocessor_singleton_info *tp);
/*! \brief Return the number of a_task elements currently in the taskprocessor queue */
int ast_taskprocessor_depth(struct taskprocessor_singleton_info *tp);
-
-/*! \brief Allocate and initialize a taskproducer convenience object */
-struct ast_taskproducer *ast_taskproducer_alloc(const char *name);
#endif
Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Sat Mar 29 15:23:20 2008
@@ -125,7 +125,7 @@
struct ast_context;
struct ast_app;
-static struct ast_taskproducer *taskproducer;
+static struct taskprocessor_singleton_info *taskprocessor;
AST_THREADSTORAGE(switch_data);
@@ -7813,8 +7813,8 @@
return;
strcpy(sc->dev, device);
t = ast_task_alloc(handle_statechange, sc, "pbx-device_state_cb");
- if (taskproducer->queue_task(taskproducer, t) < 0) {
- ast_log(LOG_WARNING, "queue_task failed!\n");
+ if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_WARNING, "failed to push a device state change notification to \'%s\'\n", ast_strdup(taskprocessor->_name));
ast_task_free(t);
}
}
@@ -7842,7 +7842,7 @@
/* Register manager application */
ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
- taskproducer = ast_taskproducer_alloc("pbx-core");
+ taskprocessor = ast_taskprocessor_reference("pbx-core", 0);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
AST_EVENT_IE_END))) {
Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=111953&r1=111952&r2=111953
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Sat Mar 29 15:23:20 2008
@@ -43,7 +43,6 @@
static int tps_taskprocessor_add(struct taskprocessor_singleton_info *t);
static int tps_taskprocessor_count(void);
static void tps_taskprocessor_destroy(void *tps);
-static void tps_taskproducer_destroy(void *tp);
static int tps_taskprocessor_ping_handler(struct a_task* e);
static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -181,18 +180,13 @@
ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
return CLI_SUCCESS;
}
-
+ ast_cli(a->fd, "\npinging %s ...", name);
t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_taskprocessor_ping");
begin = ast_tvnow();
if (ast_taskprocessor_push(tps, t) < 0) {
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
ast_task_free(t);
}
- ast_cli(a->fd, "\npinging %s ...", name);
- ast_mutex_lock(&tps->_taskprocessor_lock);
- ast_cond_signal(&tps->_poll_cond);
- ast_mutex_unlock(&tps->_taskprocessor_lock);
-
ast_mutex_lock(&_cli_ping_cond_lock);
ast_cond_wait(&_cli_ping_cond, &_cli_ping_cond_lock);
ast_mutex_unlock(&_cli_ping_cond_lock);
@@ -502,6 +496,9 @@
tps->_queue_size += 1;
ast_mutex_unlock(&tps->_taskprocessor_lock);
AST_LIST_UNLOCK(&tps->_queue);
+ ast_mutex_lock(&tps->_taskprocessor_lock);
+ ast_cond_signal(&tps->_poll_cond);
+ ast_mutex_unlock(&tps->_taskprocessor_lock);
return 0;
}
@@ -560,52 +557,3 @@
return size;
}
-/*! \brief Provide a default implementation of a queue() helper function for a taskprocessor
- * \param producer wrapper around a taskprocessor and a queue() function
- * \return 0 on success, -1 on error
- */
-static int tps_default_queue_task(struct ast_taskproducer *producer, struct a_task *task)
-{
- if (!((producer)&&(producer->_taskprocessor)) || !task) {
- ast_log(LOG_ERROR, "this function requires a proper \'%s\' structure.\n", (task)?"producer":"task");
- return -1;
- }
- if (ast_taskprocessor_push(producer->_taskprocessor, task) < 0) {
- ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", producer->_taskprocessor->_name);
- return -1;
- }
- producer->_tasks_produced++;
- ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
- ast_cond_signal(&producer->_taskprocessor->_poll_cond);
- ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
- return 0;
-}
-
-/*! \brief Allocate and initialize a taskproducer structure with a default queue_task helper function
- * \param name of taskprocessor to reference
- * \return taskproducer on success, NULL on error
- */
-struct ast_taskproducer *ast_taskproducer_alloc(const char *name)
-{
- struct ast_taskproducer *p;
- p = ao2_alloc(sizeof(*p), tps_taskproducer_destroy);
- if (!p) {
- ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
- return NULL;
- }
- p->_taskprocessor = ast_taskprocessor_reference(name, 0);
- p->queue_task = tps_default_queue_task;
- return p;
-}
-
-/*! \brief Free taskproducer resources
- * \param tp taskproducer
- * \return void
- */
-static void tps_taskproducer_destroy(void *tp)
-{
- struct ast_taskproducer *p;
- p = (struct ast_taskproducer *)tp;
- ao2_ref(p->_taskprocessor, -1);
-}
-
More information about the asterisk-commits
mailing list