[asterisk-commits] dhubbard: branch group/taskprocessors r112122 - in /team/group/taskprocessors...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Mar 31 23:27:35 CDT 2008
Author: dhubbard
Date: Mon Mar 31 23:27:34 2008
New Revision: 112122
URL: http://svn.digium.com/view/asterisk?view=rev&rev=112122
Log:
a bunch of changes after reviewing the branch with russell on the giant screen. next step is ao2 containers
Modified:
team/group/taskprocessors/apps/app_queue.c
team/group/taskprocessors/apps/app_voicemail.c
team/group/taskprocessors/channels/chan_sip.c
team/group/taskprocessors/include/asterisk/taskprocessor.h
team/group/taskprocessors/main/pbx.c
team/group/taskprocessors/main/taskprocessor.c
Modified: team/group/taskprocessors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_queue.c?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/apps/app_queue.c (original)
+++ team/group/taskprocessors/apps/app_queue.c Mon Mar 31 23:27:34 2008
@@ -732,12 +732,12 @@
}
/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(struct ast_task *task)
+static int handle_statechange(void *datap)
{
struct member_interface *curint;
char *loc;
char *technology;
- struct statechange *sc = task->_datap;
+ struct statechange *sc = datap;
technology = ast_strdupa(sc->dev);
loc = strchr(technology, '/');
@@ -745,7 +745,6 @@
*loc++ = '\0';
} else {
ast_free(sc);
- task->_datap = NULL;
return 0;
}
@@ -774,7 +773,6 @@
update_status(sc->dev, sc->state);
ast_free(sc);
- task->_datap = NULL;
return 0;
}
@@ -783,7 +781,7 @@
enum ast_device_state state;
const char *device;
struct statechange *sc;
- struct ast_task* t;
+ void *task;
size_t datapsize;
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
@@ -800,10 +798,11 @@
}
sc->state = state;
strcpy(sc->dev, device);
- t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
- if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ if (!(task = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb"))) {
+ ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
+ } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
ast_log(LOG_WARNING, "failed to push a device state change to the app_queue taskprocessor!!\n");
- ast_task_free(t);
+ ast_task_free(task);
}
}
@@ -6168,7 +6167,7 @@
queue_unref(q);
}
ao2_ref(queues, -1);
- ast_taskprocessor_unreference(taskprocessor);
+ taskprocessor = ast_taskprocessor_unreference(taskprocessor);
return res;
}
@@ -6214,7 +6213,7 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- taskprocessor = ast_taskprocessor_reference("app_queue", 0);
+ taskprocessor = ast_taskprocessor_get("app_queue", 0, 0);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
res = -1;
Modified: team/group/taskprocessors/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_voicemail.c?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/apps/app_voicemail.c (original)
+++ team/group/taskprocessors/apps/app_voicemail.c Mon Mar 31 23:27:34 2008
@@ -8107,10 +8107,10 @@
ast_free(mwi_sub);
}
-static int handle_unsubscribe(struct ast_task *task)
+static int handle_unsubscribe(void *datap)
{
struct mwi_sub *mwi_sub;
- uint32_t *uniqueid = (uint32_t *)task->_datap;
+ uint32_t *uniqueid = datap;
AST_RWLIST_WRLOCK(&mwi_subs);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) {
@@ -8126,15 +8126,14 @@
mwi_sub_destroy(mwi_sub);
ast_free(uniqueid);
- task->_datap = NULL;
return 0;
}
-static int handle_subscribe(struct ast_task *task)
+static int handle_subscribe(void *datap)
{
unsigned int len;
struct mwi_sub *mwi_sub;
- struct mwi_sub_task *p = task->_datap;
+ struct mwi_sub_task *p = datap;
len = sizeof(*mwi_sub);
if (!ast_strlen_zero(p->mailbox))
@@ -8159,7 +8158,6 @@
AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry);
AST_RWLIST_UNLOCK(&mwi_subs);
ast_free(p);
- task->_datap = NULL;
return 0;
}
@@ -8178,9 +8176,10 @@
}
u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
*uniqueid = u;
- t = ast_task_alloc(handle_unsubscribe, uniqueid, "app_voicemail, mwi_unsub_event_cb");
- if (ast_taskprocessor_push(taskprocessor, t) < 0) {
- ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
+ if (!(t = ast_task_alloc(handle_unsubscribe, uniqueid, "app_voicemail, mwi_unsub_event_cb"))) {
+ ast_log(LOG_WARNING, "failed to allocate a task for a mwi unsubcribe event\n");
+ } else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to taskprocessor\n");
ast_task_free(t);
}
}
@@ -8203,12 +8202,12 @@
mwist->context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
- t = ast_task_alloc(handle_subscribe, mwist, "app_voicemail, mwi_sub_event_cb");
- if (ast_taskprocessor_push(taskprocessor, t) < 0) {
- ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to \'%s\'\n", ast_strdup(taskprocessor->_name));
+ if (!(t = ast_task_alloc(handle_subscribe, mwist, "app_voicemail, mwi_sub_event_cb"))) {
+ ast_log(LOG_WARNING, "failed to allocate a task for a mwi subscribe event\n");
+ } else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
+ ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to taskprocessor\n");
ast_task_free(t);
}
-
}
static void start_poll_thread(void)
@@ -9012,7 +9011,7 @@
if (poll_thread != AST_PTHREADT_NULL)
stop_poll_thread();
- ast_taskprocessor_unreference(taskprocessor);
+ taskprocessor = ast_taskprocessor_unreference(taskprocessor);
return res;
}
@@ -9026,7 +9025,7 @@
snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
/* this should probably go into load_config() */
- taskprocessor = ast_taskprocessor_reference("app_voicemail", 0);
+ taskprocessor = ast_taskprocessor_get("app_voicemail", 0, 0);
if ((res = load_config(0)))
return res;
Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Mon Mar 31 23:27:34 2008
@@ -1704,7 +1704,7 @@
static int sip_queryoption(struct ast_channel *chan, int option, void *data, int *datalen);
static const char *sip_get_callid(struct ast_channel *chan);
-static int handle_request_do(struct ast_task *task);
+static int handle_request_do(void *datap);
static int sip_standard_port(struct sip_socket s);
static int sip_prepare_socket(struct sip_pvt *p);
@@ -18085,7 +18085,7 @@
static int sipsock_read(int *id, int fd, short events, void *ignore)
{
struct sip_handle_request_task *taskdata;
- struct ast_task *task;
+ void *task;
int res;
socklen_t len;
static char readbuf[65535];
@@ -18121,23 +18121,24 @@
taskdata->req.socket.port = bindaddr.sin_port;
taskdata->req.socket.lock = NULL;
- task = ast_task_alloc(handle_request_do, taskdata, "sipsock_read");
- if (ast_taskprocessor_push(taskprocessor, task) < 0) {
- ast_log(LOG_WARNING, "failed to push a SIP message to \'%s\'\n", ast_strdup(taskprocessor->_name));
+ if (!(task = ast_task_alloc(handle_request_do, taskdata, "sipsock_read"))) {
+ ast_log(LOG_WARNING, "failed to allocate a task for SIP message\n");
+ } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
+ ast_log(LOG_WARNING, "failed to push a SIP message to taskprocessor\n");
ast_task_free(task);
}
return 1;
}
-static int handle_request_do(struct ast_task *task)
+static int handle_request_do(void *datap)
{
struct sip_pvt *p;
int recount = 0;
int nounlock = 0;
int lockretry;
- struct sip_handle_request_task *tomato = (struct sip_handle_request_task *)task->_datap;
- struct sip_request *req = (struct sip_request *)&tomato->req;
- struct sockaddr_in *sin = (struct sockaddr_in *)&tomato->sin;
+ struct sip_handle_request_task *willy = datap;
+ struct sip_request *req = (struct sip_request *)&willy->req;
+ struct sockaddr_in *sin = (struct sockaddr_in *)&willy->sin;
if (sip_debug_test_addr(sin)) /* Set the debug flag early on packet level */
req->debug = 1;
@@ -18160,9 +18161,7 @@
ast_free(req->data);
req->data = NULL;
}
- ast_free(task->_datap);
- task->_datap = NULL;
- ast_task_free(task);
+ ast_free(willy);
return 1;
}
@@ -18179,9 +18178,7 @@
ast_free(req->data);
req->data = NULL;
}
- ast_free(task->_datap);
- task->_datap = NULL;
- ast_task_free(task);
+ ast_free(willy);
return 1;
}
@@ -18214,9 +18211,7 @@
ast_free(req->data);
req->data = NULL;
}
- ast_free(task->_datap);
- task->_datap = NULL;
- ast_task_free(task);
+ ast_free(willy);
return 1;
}
@@ -18238,9 +18233,7 @@
ast_free(req->data);
req->data = NULL;
}
- ast_free(task->_datap);
- task->_datap = NULL;
- ast_task_free(task);
+ ast_free(willy);
return 1;
}
@@ -21489,7 +21482,7 @@
ast_verbose("SIP channel loading...\n");
/* this is not the best place to put this */
- taskprocessor = ast_taskprocessor_reference("sipsock", 0);
+ taskprocessor = ast_taskprocessor_get("sipsock", 0, 0);
ASTOBJ_CONTAINER_INIT(&userl); /* User object list */
ASTOBJ_CONTAINER_INIT(&peerl); /* Peer object list */
@@ -21568,7 +21561,7 @@
struct sip_threadinfo *th;
struct ast_context *con;
- ast_taskprocessor_unreference(taskprocessor);
+ taskprocessor = ast_taskprocessor_unreference(taskprocessor);
/* First, take us out of the channel type list */
ast_channel_unregister(&sip_tech);
Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Mon Mar 31 23:27:34 2008
@@ -34,7 +34,7 @@
* with a friendly name and shared across modules. Modules that reference a taskprocessor
* can push ast_task objects into the default taskprocessor FIFO container. The default
* taskprocessing function can be overridden at the time of taskprocessor creation using
- * the ast_taskprocessor_reference() function. Taskprocessors utilize astobj2 and will
+ * the ast_taskprocessor_get() function. Taskprocessors utilize astobj2 and will
* be created upon the first reference to a taskprocessor name, as well as destroy
* themselves when the taskprocessor reference count reaches zero.
*
@@ -44,78 +44,42 @@
* and then setting the task datap pointer to NULL before returning. The taskprocessor will
* free the task structure after the task callback returns control to the taskprocessor function.
*/
+struct ast_task;
+struct ast_taskprocessor;
-/*! \brief ast_task structure is queued to a taskprocessor and released
- * by the taskprocessing thread. The callback function that is assigned
- * to the execute() function pointer is responsible for releasing
- * _datap resources if necessary. */
-struct ast_task {
- /*! \brief The execute() task callback function pointer */
- int (*execute)(struct ast_task *t);
- /*! \brief The data pointer for the task execute() function */
- void *_datap;
- /*! \brief The function that allocated a task object (To become debug only) */
- char _source[256];
- /*! \brief AST_LIST_ENTRY overhead */
- AST_LIST_ENTRY(ast_task) list;
-};
-
-/*! \brief ast_taskprocessor_stats are technically optional, but
- * used by default to keep track of statistics for an individual taskprocessor */
-struct ast_taskprocessor_stats {
- /*! \brief This is the maximum number of tasks queued at any one time */
- unsigned long _max_qsize;
- /*! \brief This is the current number of tasks processed */
- unsigned long _tasks_processed_count;
-};
-
-/*! \brief A ast_taskprocessor structure is used to manage
- * a unique taskprocessor thread */
-struct ast_taskprocessor {
- /*! \brief Friendly name of the taskprocessor */
- char _name[80];
- /*! \brief Address of the taskprocessor singleton structure */
- unsigned long _id;
- /*! \brief Thread poll condition */
- ast_cond_t _poll_cond;
- /*! \brief Taskprocessor thread */
- pthread_t _poll_thread;
- /*! \brief Taskprocessor lock */
- ast_mutex_t _taskprocessor_lock;
- /*! \brief Taskprocesor thread run flag */
- unsigned char _poll_thread_run;
- /*! \brief Hold a pointer to the taskprocessing function */
- void *(*_poll_function)(void*);
- /*! \brief Taskprocessor statistics */
- struct ast_taskprocessor_stats *_stats;
- /*! \brief private data for a taskprocessor */
- void *_private;
- /*! \brief Taskprocessor current queue size */
- long _queue_size;
- /*! \brief Taskprocessor queue */
- AST_LIST_HEAD(_queue, ast_task) _queue;
- /*! \brief Taskprocessor singleton list entry */
- AST_LIST_ENTRY(ast_taskprocessor) list;
+/*! \brief ast_tps_reftype is used to specify whether a taskprocessor should be created
+ * on an attempt to ast_taskprocessor_get() if the taskprocessor does not already exist.
+ * The default behavior is to create a taskprocessor if it does not already exist and provide
+ * a reference to the taskprocessor if it already exists. */
+enum ast_tps_reftype {
+ /*! \brief return a reference to a taskprocessor, create one if it does not exist */
+ TPS_REF_DEF = 0
+ /*! \brief return a reference to a taskprocessor ONLY if it already exists */
+ , TPS_REF_IF_EXISTS
+ /* add new entries above this comment */
+ , TPS_REF_ENUM_SIZE
};
/*! \brief Initialize the taskprocessor subsystem */
int ast_tps_init(void);
/*! \brief Allocate ast_task object */
-struct ast_task *ast_task_alloc(int (*task_exe)(struct ast_task *task), void *datap, char *src);
+struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap, char *src);
/*! \brief Release ast_task resources */
int ast_task_free(struct ast_task *task);
/*! \brief Obtain a taskprocessor reference */
-struct ast_taskprocessor *ast_taskprocessor_reference(const char *name, void *(*func)(void*));
+struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create);
/*! \brief Release a taskprocessor reference */
-int ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
+void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
/*! \brief Push a task on the taskprocessor */
-int ast_taskprocessor_push(struct ast_taskprocessor *tp, struct ast_task *t);
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t);
/*! \brief Pop the front task off the taskprocessor queue and return it to the calling function */
-struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tp);
+struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps);
/*! \brief Return the number of ast_task elements currently in the taskprocessor queue */
-int ast_taskprocessor_depth(struct ast_taskprocessor *tp);
+int ast_taskprocessor_depth(struct ast_taskprocessor *tps);
+/*! \brief Return the name of the taskprocessor singleton */
+char * ast_taskprocessor_name(struct ast_taskprocessor *tps);
#endif
Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Mon Mar 31 23:27:34 2008
@@ -2933,10 +2933,10 @@
return ast_extension_state2(e); /* Check all devices in the hint */
}
-static int handle_statechange(struct ast_task *task)
+static int handle_statechange(void *datap)
{
struct ast_hint *hint;
- struct statechange *sc = task->_datap;
+ struct statechange *sc = datap;
AST_RWLIST_RDLOCK(&hints);
AST_RWLIST_TRAVERSE(&hints, hint, list) {
@@ -2976,7 +2976,6 @@
}
AST_RWLIST_UNLOCK(&hints);
ast_free(sc);
- task->_datap = NULL;
return 0;
}
@@ -7801,7 +7800,7 @@
{
const char *device;
struct statechange *sc;
- struct ast_task *t;
+ void *task;
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
if (ast_strlen_zero(device)) {
@@ -7812,10 +7811,11 @@
if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
return;
strcpy(sc->dev, device);
- t = ast_task_alloc(handle_statechange, sc, "pbx-device_state_cb");
- if (ast_taskprocessor_push(taskprocessor, t) < 0) {
- ast_log(LOG_WARNING, "failed to push a device state change notification to \'%s\'\n", ast_strdup(taskprocessor->_name));
- ast_task_free(t);
+ if (!(task = ast_task_alloc(handle_statechange, sc, "pbx-device_state_cb"))) {
+ ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
+ } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
+ ast_log(LOG_WARNING, "failed to push a device state change notification to taskprocessor\n");
+ ast_task_free(task);
}
}
@@ -7842,7 +7842,7 @@
/* Register manager application */
ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
- taskprocessor = ast_taskprocessor_reference("pbx-core", 0);
+ taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
AST_EVENT_IE_END))) {
Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=112122&r1=112121&r2=112122
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Mon Mar 31 23:27:34 2008
@@ -23,27 +23,76 @@
* \author Dwayne Hubbard <dhubbard at digium.com>
*/
-#include <asterisk.h>
-#include <asterisk/time.h>
-#include <asterisk/astobj2.h>
-#include <asterisk/cli.h>
-#include <asterisk/taskprocessor.h>
-#include <signal.h>
-#include <sys/time.h>
+#include "asterisk.h"
+#include "asterisk/time.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/cli.h"
+#include "asterisk/taskprocessor.h"
+#include "signal.h"
+#include "sys/time.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-AST_LIST_HEAD_STATIC(_taskprocessor_singletons, ast_taskprocessor);
-static int _taskprocessor_singletons_list_size = 0;
-static ast_cond_t _cli_ping_cond;
-static ast_mutex_t _cli_ping_cond_lock;
+/*! \brief ast_task structure is queued to a taskprocessor and released
+ * by the taskprocessing thread. The callback function that is assigned
+ * to the execute() function pointer is responsible for releasing
+ * datap resources if necessary. */
+struct ast_task {
+ /*! \brief The execute() task callback function pointer */
+ int (*execute)(void *datap);
+ /*! \brief The data pointer for the task execute() function */
+ void *datap;
+ /*! \brief The function that allocated a task object (To become debug only) */
+ char source[256];
+ /*! \brief AST_LIST_ENTRY overhead */
+ AST_LIST_ENTRY(ast_task) list;
+};
+
+/*! \brief ast_taskprocessor_stats are technically optional, but
+ * used by default to keep track of statistics for an individual taskprocessor */
+struct ast_taskprocessor_stats {
+ /*! \brief This is the maximum number of tasks queued at any one time */
+ unsigned long max_qsize;
+ /*! \brief This is the current number of tasks processed */
+ unsigned long _tasks_processed_count;
+};
+
+/*! \brief A ast_taskprocessor structure is used to manage
+ * a unique taskprocessor thread */
+struct ast_taskprocessor {
+ /*! \brief Friendly name of the taskprocessor */
+ char name[80];
+ /*! \brief Thread poll condition */
+ ast_cond_t poll_cond;
+ /*! \brief Taskprocessor thread */
+ pthread_t poll_thread;
+ /*! \brief Taskprocessor lock */
+ ast_mutex_t taskprocessor_lock;
+ /*! \brief Taskprocesor thread run flag */
+ unsigned char poll_thread_run;
+ /*! \brief Hold a pointer to the taskprocessing function */
+ void *(*poll_function)(void*);
+ /*! \brief Taskprocessor statistics */
+ struct ast_taskprocessor_stats *stats;
+ /*! \brief Taskprocessor current queue size */
+ long queue_size;
+ /*! \brief Taskprocessor queue */
+ AST_LIST_HEAD(queue, ast_task) queue;
+ /*! \brief Taskprocessor singleton list entry */
+ AST_LIST_ENTRY(ast_taskprocessor) list;
+};
+
+AST_LIST_HEAD_STATIC(taskprocessor_singletons, ast_taskprocessor);
+static int taskprocessor_singletons_list_size = 0;
+static ast_cond_t cli_ping_cond;
+static ast_mutex_t cli_ping_cond_lock;
static void *tps_default_processor_function(void *data);
static struct ast_taskprocessor *tps_default_constructor(void);
static int tps_taskprocessor_add(struct ast_taskprocessor *t);
static int tps_taskprocessor_count(void);
static void tps_taskprocessor_destroy(void *tps);
-static int tps_taskprocessor_ping_handler(struct ast_task* e);
+static int tps_taskprocessor_ping_handler(void *datap);
static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -59,7 +108,7 @@
*/
int ast_tps_init(void)
{
- ast_cli_register_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+ ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
return 0;
}
@@ -69,17 +118,14 @@
* \param src where the task came from, this is going to be debug only
* \return A task prepared to be queued into a taskprocessor
*/
-struct ast_task *ast_task_alloc(int (*task_exe)(struct ast_task *task), void *datap, char *src)
+struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap, char *src)
{
struct ast_task *t;
- t = ast_calloc(1,sizeof(*t));
- if (!t) {
- ast_log(LOG_ERROR, "Failed to allocate memory for \'%s\'\n", src);
- return NULL;
- }
- t->execute = task_exe;
- t->_datap = datap;
- strncpy(t->_source, src, sizeof(t->_source));
+ if ((t = ast_calloc(1, sizeof(*t)))) {
+ t->execute = task_exe;
+ t->datap = datap;
+ ast_copy_string(t->source, src, sizeof(t->source));
+ }
return t;
}
@@ -89,8 +135,8 @@
*/
int ast_task_free(struct ast_task *task)
{
- if (task->_datap) {
- ast_free(task->_datap);
+ if (task) {
+ ast_free(task);
}
return 0;
}
@@ -104,38 +150,32 @@
{
int tklen;
int wordnum = 0;
- char *name;
-
- if(a->pos != 2)
+ char *name = NULL;
+
+ if (a->pos != 2)
return NULL;
tklen = strlen(a->word);
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
- if (!strncasecmp(a->word, p->_name, tklen) && ++wordnum > a->n) {
- name = ast_strdup(p->_name);
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- return name;
- break;
- }
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- return NULL;
+ AST_LIST_LOCK(&taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
+ if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
+ name = ast_strdup(p->name);
+ break;
+ }
+ }
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
+ return name;
}
/*! \brief CLI 'taskprocessor ping <blah>' operation handler
* \param task the ping task queued by the CLI operation
* \return 0 on success, -1 on error
*/
-static int tps_taskprocessor_ping_handler(struct ast_task *task)
-{
- if (!task) {
- ast_log(LOG_ERROR, "Huh? There is no task? This is terribly vexing!\n");
- return -1;
- }
- ast_mutex_lock(&_cli_ping_cond_lock);
- ast_cond_signal(&_cli_ping_cond);
- ast_mutex_unlock(&_cli_ping_cond_lock);
+static int tps_taskprocessor_ping_handler(void *datap)
+{
+ ast_mutex_lock(&cli_ping_cond_lock);
+ ast_cond_signal(&cli_ping_cond);
+ ast_mutex_unlock(&cli_ping_cond_lock);
return 0;
}
@@ -149,7 +189,6 @@
{
struct timeval begin, end, delta;
char *name;
- int found = 0;
struct ast_task *t = NULL;
struct ast_taskprocessor *tps = NULL;
@@ -168,31 +207,24 @@
return CLI_SHOWUSAGE;
name = a->argv[2];
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, tps, list) {
- ast_mutex_lock(&tps->_taskprocessor_lock);
- if (!strcasecmp(tps->_name, name)) {
- found = 1;
- }
- ast_mutex_unlock(&tps->_taskprocessor_lock);
- if (found) break;
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
-
- if (!found || !tps) {
+ if (!(tps = ast_taskprocessor_get(name, 0, TPS_REF_IF_EXISTS))) {
ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
return CLI_SUCCESS;
}
ast_cli(a->fd, "\npinging %s ...", name);
- t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_taskprocessor_ping");
+ if (!(t = ast_task_alloc(tps_taskprocessor_ping_handler, 0, "cli_taskprocessor_ping"))) {
+ ast_cli(a->fd, "\n\tfailed to allocate a task\n\n");
+ return CLI_FAILURE;
+ }
begin = ast_tvnow();
+ ast_mutex_lock(&cli_ping_cond_lock);
if (ast_taskprocessor_push(tps, t) < 0) {
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
ast_task_free(t);
- }
- ast_mutex_lock(&_cli_ping_cond_lock);
- ast_cond_wait(&_cli_ping_cond, &_cli_ping_cond_lock);
- ast_mutex_unlock(&_cli_ping_cond_lock);
+ return CLI_FAILURE;
+ }
+ ast_cond_wait(&cli_ping_cond, &cli_ping_cond_lock);
+ ast_mutex_unlock(&cli_ping_cond_lock);
end = ast_tvnow();
delta = ast_tvsub(end, begin);
ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, delta.tv_sec, (long int)delta.tv_usec);
@@ -228,24 +260,24 @@
return CLI_SHOWUSAGE;
ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
- ast_mutex_lock(&p->_taskprocessor_lock);
- snprintf(name, sizeof(name), "%s", p->_name);
- qsize = p->_queue_size;
- maxqsize = p->_stats->_max_qsize;
- processed = p->_stats->_tasks_processed_count;
- ast_mutex_unlock(&p->_taskprocessor_lock);
+ AST_LIST_LOCK(&taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
+ ast_mutex_lock(&p->taskprocessor_lock);
+ ast_copy_string(name, p->name, sizeof(name));
+ qsize = p->queue_size;
+ maxqsize = p->stats->max_qsize;
+ processed = p->stats->_tasks_processed_count;
+ ast_mutex_unlock(&p->taskprocessor_lock);
ast_cli(a->fd, "\n%24s %17ld %12ld %12ld", name, processed, qsize, maxqsize);
}
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
ast_cli(a->fd, "\n\n");
return CLI_SUCCESS;
}
/*! \brief The default taskprocessing thread pops tasks off its queue and executes it
* \param data ast_taskprocessor structure for the named thread
- * \note The default taskprocessor thread function can be overridden via ast_taskprocessor_reference()
+ * \note The default taskprocessor thread function can be overridden via ast_taskprocessor_get()
* \return NULL
*/
static void *tps_default_processor_function(void *data)
@@ -260,7 +292,7 @@
return NULL;
}
- while (i->_poll_thread_run) {
+ while (i->poll_thread_run) {
if ((size = ast_taskprocessor_depth(i)) > 0) {
/* stuff is in the queue */
t = ast_taskprocessor_pop(i);
@@ -269,29 +301,30 @@
continue;
}
if (!t->execute) {
- ast_log(LOG_WARNING, "A task from '\%s\' has no function to execute\n", t->_source);
+ ast_log(LOG_WARNING, "A task from '\%s\' has no function to execute\n", t->source);
ast_task_free(t);
continue;
}
- t->execute(t);
- ast_mutex_lock(&i->_taskprocessor_lock);
- if (i->_stats) {
- i->_stats->_tasks_processed_count++;
- if (size > i->_stats->_max_qsize) {
- i->_stats->_max_qsize = size;
+ t->execute(t->datap);
+ ast_mutex_lock(&i->taskprocessor_lock);
+ if (i->stats) {
+ i->stats->_tasks_processed_count++;
+ if (size > i->stats->max_qsize) {
+ i->stats->max_qsize = size;
}
}
- ast_mutex_unlock(&i->_taskprocessor_lock);
+ ast_mutex_unlock(&i->taskprocessor_lock);
ast_task_free(t);
- if (--size) continue;
- }
- ast_mutex_lock(&i->_taskprocessor_lock);
- if (!i->_poll_thread_run) {
- ast_mutex_unlock(&i->_taskprocessor_lock);
+ if (--size)
+ continue;
+ }
+ ast_mutex_lock(&i->taskprocessor_lock);
+ if (!i->poll_thread_run) {
+ ast_mutex_unlock(&i->taskprocessor_lock);
break;
}
- ast_cond_wait(&i->_poll_cond, &i->_taskprocessor_lock);
- ast_mutex_unlock(&i->_taskprocessor_lock);
+ ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
+ ast_mutex_unlock(&i->taskprocessor_lock);
}
while (ast_taskprocessor_depth(i)) {
/* stuff is in the queue */
@@ -313,19 +346,26 @@
struct ast_taskprocessor *tps;
tps = ao2_alloc(sizeof(*tps), tps_taskprocessor_destroy);
if (!tps) {
- ast_log(LOG_ERROR, "cannot allocate memory for a ast_taskprocessor structure.\n");
- return NULL;
- }
- tps->_id = (unsigned long)tps;
- ast_cond_init(&tps->_poll_cond, NULL);
- tps->_poll_thread = AST_PTHREADT_NULL;
- tps->_stats = ast_calloc(1, sizeof(*tps->_stats));
- if (!tps->_stats) {
- ast_log(LOG_ERROR, "cannot allocate memory for a ast_taskprocessor_stats structure.\n");
- ast_taskprocessor_unreference(tps);
- return NULL;
+ return NULL;
+ }
+ ast_cond_init(&tps->poll_cond, NULL);
+ tps->poll_thread = AST_PTHREADT_NULL;
+ tps->stats = ast_calloc(1, sizeof(*tps->stats));
+ if (!tps->stats) {
+ return ast_taskprocessor_unreference(tps);
}
return tps;
+}
+
+/*! \brief Return the name of a taskprocessor singleton
+ * \param tps pointer to a taskprocessor singleton */
+char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
+{
+ if (!tps) {
+ ast_log(LOG_ERROR, "no taskprocessor specified!\n");
+ return NULL;
+ }
+ return tps->name;
}
/*! \brief Return a pointer to the taskprocessor singleton structure and create it if necessary.
@@ -333,7 +373,7 @@
* \param custom_func the function executed by the taskprocessor thread
* \return ast_taskprocessor pointer on success, NULL on error
*/
-struct ast_taskprocessor *ast_taskprocessor_reference(const char *name, void *(*custom_func)(void*))
+struct ast_taskprocessor *ast_taskprocessor_get(const char *name, void *(*custom_func)(void*), enum ast_tps_reftype create)
{
int rc;
struct ast_taskprocessor *p;
@@ -342,71 +382,62 @@
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
return NULL;
}
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
- if (!strcasecmp(p->_name, name)) {
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_debug(5, "taskprocessor_singleton \'%s\' already exists!.\n", p->_name);
- if (p->_poll_function != ((custom_func)?custom_func:tps_default_processor_function)) {
+ AST_LIST_LOCK(&taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&taskprocessor_singletons, p, list) {
+ if (!strcasecmp(p->name, name)) {
+ ast_debug(5, "taskprocessor_singleton \'%s\' already exists!.\n", p->name);
+ if ((create == TPS_REF_DEF) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
return NULL;
}
ao2_ref(p, 1);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
return p;
}
}
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ if (create == TPS_REF_IF_EXISTS) {
+ /* we were told not to create a taskprocessor if it didn't exist, so lets take a holiday */
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
+ return NULL;
+ }
if ((p = tps_default_constructor()) == NULL) {
ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
- return NULL;
- }
- snprintf(p->_name, sizeof(p->_name), "%s", name);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
+ return NULL;
+ }
+ ast_copy_string(p->name, name, sizeof(p->name));
if (tps_taskprocessor_add(p) < 0) {
- ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- p->_poll_thread_run = 1;
+ ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\'\n", p->name);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
+ return ast_taskprocessor_unreference(p);
+ }
+ p->poll_thread_run = 1;
ast_debug(5, "creating taskprocessor \'%s\', taskprocessor count: %d\n", name, tps_taskprocessor_count());
- /* stay stopped if we are supposed to be stopped */
- if (p->_poll_thread == AST_PTHREADT_STOP) {
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- if (p->_poll_thread == pthread_self()) {
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- if (p->_poll_thread != AST_PTHREADT_NULL) {
- /* wake it up */
- pthread_kill(p->_poll_thread, SIGURG);
+ /* The logic for creating the processor thread in the block below may seem silly,
+ * but doing it this way results in more useful 'core show threads' output */
+ if (custom_func) {
+ p->poll_function = custom_func;
+ rc = ast_pthread_create(&p->poll_thread, NULL, custom_func, p);
} else {
- /* The logic for creating the processor thread in the block below may seem silly,
- * but doing it this way results in more useful 'core show threads' output */
- if (custom_func) {
- p->_poll_function = custom_func;
- rc = ast_pthread_create(&p->_poll_thread, NULL, custom_func, p);
- } else {
- p->_poll_function = tps_default_processor_function;
- rc = ast_pthread_create(&p->_poll_thread, NULL, tps_default_processor_function, p);
- }
- if (rc < 0) {
- ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- }
+ p->poll_function = tps_default_processor_function;
+ rc = ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p);
+ }
+ if (rc < 0) {
+ ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->name);
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
+ return ast_taskprocessor_unreference(p);
+ }
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
return p;
}
-int ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
-{
- if (!tps) {
- ast_log(LOG_ERROR, "You must specify the taskprocessor that you want to unreference\n");
- return -1;
- }
- ao2_ref(tps, -1);
- return 0;
+void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
+{
+ if (tps) {
+ ao2_ref(tps, -1);
+ }
+ return NULL;
}
/*! \brief The taskprocessor destructor is called by astobj2 when the reference count reaches zero
@@ -415,34 +446,24 @@
*/
static void tps_taskprocessor_destroy(void *tps)
{
- struct ast_taskprocessor *n;
- struct ast_taskprocessor *t = (struct ast_taskprocessor *)tps;
-
- ast_debug(5, "destroying taskprocessor \'%s\'\n", t->_name);
- if (!t) {
- ast_log(LOG_ERROR, "can't destruct a NULL taskprocessor_singleton.\n");
- return;
- }
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
- if (n == t) {
- AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
- _taskprocessor_singletons_list_size -= 1;
- ast_debug(5, "taskprocessor_singleton \'%s\' removed.\n", t->_name);
- break;
- }
- }
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- ast_mutex_lock(&t->_taskprocessor_lock);
- t->_poll_thread_run = 0;
- ast_cond_signal(&t->_poll_cond);
- ast_mutex_unlock(&t->_taskprocessor_lock);
- pthread_join(t->_poll_thread, NULL);
- t->_poll_thread = AST_PTHREADT_NULL;
- if (t->_stats) {
- ast_free(t->_stats);
- t->_stats = NULL;
- }
+ struct ast_taskprocessor *t = tps;
+
+ ast_debug(5, "destroying taskprocessor \'%s\'\n", t->name);
+ AST_LIST_LOCK(&taskprocessor_singletons);
+ AST_LIST_REMOVE(&taskprocessor_singletons, t, list);
+ taskprocessor_singletons_list_size--;
+ ast_debug(5, "taskprocessor_singleton \'%s\' removed.\n", t->name);
+ ast_mutex_lock(&t->taskprocessor_lock);
+ t->poll_thread_run = 0;
+ ast_cond_signal(&t->poll_cond);
+ ast_mutex_unlock(&t->taskprocessor_lock);
+ pthread_join(t->poll_thread, NULL);
+ t->poll_thread = AST_PTHREADT_NULL;
+ if (t->stats) {
+ ast_free(t->stats);
+ t->stats = NULL;
+ }
+ AST_LIST_UNLOCK(&taskprocessor_singletons);
return;
}
@@ -452,19 +473,8 @@
*/
static int tps_taskprocessor_add(struct ast_taskprocessor *t)
{
- struct ast_taskprocessor *n = NULL;
-
- AST_LIST_LOCK(&_taskprocessor_singletons);
- AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
- if (!strcasecmp(n->_name, t->_name)) {
- ast_log(LOG_WARNING, "A taskprocessor named \'%s\' already exists\n", t->_name);
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- return -1;
- }
- }
- AST_LIST_INSERT_TAIL(&_taskprocessor_singletons, t, list);
- _taskprocessor_singletons_list_size += 1;
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ AST_LIST_INSERT_TAIL(&taskprocessor_singletons, t, list);
+ taskprocessor_singletons_list_size++;
return 0;
}
@@ -474,11 +484,7 @@
*/
static int tps_taskprocessor_count(void)
{
- int size;
- AST_LIST_LOCK(&_taskprocessor_singletons);
- size = _taskprocessor_singletons_list_size;
- AST_LIST_UNLOCK(&_taskprocessor_singletons);
- return size;
+ return taskprocessor_singletons_list_size;
}
/*! \brief Push a task into the taskprocessor queue
@@ -494,24 +500,22 @@
ast_log(LOG_ERROR, "A \'%s\' is required for this function.\n", (tps)?"task":"taskprocessor");
return -1;
}
- AST_LIST_LOCK(&tps->_queue);
- while (ast_mutex_trylock(&tps->_taskprocessor_lock)) {
+ AST_LIST_LOCK(&tps->queue);
+ while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
lock_failures++;
- AST_LIST_UNLOCK(&tps->_queue);
+ AST_LIST_UNLOCK(&tps->queue);
usleep(1);
if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->_name);
+ ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
return -1;
}
- AST_LIST_LOCK(&tps->_queue);
- }
- AST_LIST_INSERT_TAIL(&tps->_queue, t, list);
- tps->_queue_size += 1;
- ast_mutex_unlock(&tps->_taskprocessor_lock);
- AST_LIST_UNLOCK(&tps->_queue);
- ast_mutex_lock(&tps->_taskprocessor_lock);
- ast_cond_signal(&tps->_poll_cond);
- ast_mutex_unlock(&tps->_taskprocessor_lock);
+ AST_LIST_LOCK(&tps->queue);
+ }
+ AST_LIST_INSERT_TAIL(&tps->queue, t, list);
+ tps->queue_size++;
+ AST_LIST_UNLOCK(&tps->queue);
+ ast_cond_signal(&tps->poll_cond);
+ ast_mutex_unlock(&tps->taskprocessor_lock);
return 0;
}
@@ -524,23 +528,23 @@
struct ast_task *t = NULL;
int lock_failures = 0;
- AST_LIST_LOCK(&tps->_queue);
- while (ast_mutex_trylock(&tps->_taskprocessor_lock)) {
+ AST_LIST_LOCK(&tps->queue);
+ while (ast_mutex_trylock(&tps->taskprocessor_lock)) {
lock_failures++;
- AST_LIST_UNLOCK(&tps->_queue);
+ AST_LIST_UNLOCK(&tps->queue);
usleep(1);
if (lock_failures > 10) {
- ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->_name);
+ ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->name);
return t;
}
- AST_LIST_LOCK(&tps->_queue);
- }
- if (!AST_LIST_EMPTY(&tps->_queue)) {
[... 47 lines stripped ...]
More information about the asterisk-commits
mailing list