[asterisk-commits] dhubbard: branch group/taskprocessors r115192 - in /team/group/taskprocessors...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri May 2 01:23:02 CDT 2008
Author: dhubbard
Date: Fri May 2 01:23:01 2008
New Revision: 115192
URL: http://svn.digium.com/view/asterisk?view=rev&rev=115192
Log:
convert ast_task to tps_task
merge ast_task_alloc and ast_taskprocessor_push interfaces
ast_task_alloc, ast_task_free, ast_taskprocessor_depth, ast_taskprocessor_pop are now static and converted to tps_ functions
remove custom function specification option from ast_taskprocessor_get
Modified:
team/group/taskprocessors/apps/app_queue.c
team/group/taskprocessors/apps/app_voicemail.c
team/group/taskprocessors/channels/chan_sip.c
team/group/taskprocessors/include/asterisk/taskprocessor.h
team/group/taskprocessors/main/pbx.c
team/group/taskprocessors/main/taskprocessor.c
Modified: team/group/taskprocessors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_queue.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/apps/app_queue.c (original)
+++ team/group/taskprocessors/apps/app_queue.c Fri May 2 01:23:01 2008
@@ -132,7 +132,7 @@
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
};
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *devicestate_tps;
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
@@ -791,7 +791,6 @@
enum ast_device_state state;
const char *device;
struct statechange *sc;
- void *task;
size_t datapsize;
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
@@ -808,11 +807,8 @@
}
sc->state = state;
strcpy(sc->dev, device);
- if (!(task = ast_task_alloc(handle_statechange, sc))) {
- ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
- } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
- ast_log(LOG_WARNING, "failed to push a device state change to the app_queue taskprocessor!!\n");
- ast_task_free(task);
+ if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
+ ast_free(sc);
}
}
@@ -6237,7 +6233,7 @@
queue_unref(q);
}
ao2_ref(queues, -1);
- taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+ devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
return res;
}
@@ -6283,7 +6279,9 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- taskprocessor = ast_taskprocessor_get("app_queue", 0, 0);
+ if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
+ ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
+ }
if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
res = -1;
Modified: team/group/taskprocessors/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/apps/app_voicemail.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/apps/app_voicemail.c (original)
+++ team/group/taskprocessors/apps/app_voicemail.c Fri May 2 01:23:01 2008
@@ -629,7 +629,7 @@
uint32_t uniqueid;
};
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *mwi_subscription_tps;
static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
@@ -8782,7 +8782,6 @@
static void mwi_unsub_event_cb(const struct ast_event *event, void *userdata)
{
- struct ast_task *t;
uint32_t u, *uniqueid = ast_calloc(1, sizeof(*uniqueid));
if (ast_event_get_type(event) != AST_EVENT_UNSUB)
return;
@@ -8790,23 +8789,17 @@
if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
return;
- if (!taskprocessor) {
- ast_log(LOG_WARNING, "taskprocessor is still NULL\n");
- }
u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
*uniqueid = u;
- if (!(t = ast_task_alloc(handle_unsubscribe, uniqueid))) {
- ast_log(LOG_WARNING, "failed to allocate a task for a mwi unsubcribe event\n");
- } else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
- ast_log(LOG_ERROR, "failed to push a voicemail mwi unsubscribe task to taskprocessor\n");
- ast_task_free(t);
+ if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
+ ast_free(uniqueid);
}
}
static void mwi_sub_event_cb(const struct ast_event *event, void *userdata)
{
struct mwi_sub_task *mwist;
- struct ast_task *t;
+
if (ast_event_get_type(event) != AST_EVENT_SUB)
return;
@@ -8821,11 +8814,8 @@
mwist->context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
- if (!(t = ast_task_alloc(handle_subscribe, mwist))) {
- ast_log(LOG_WARNING, "failed to allocate a task for a mwi subscribe event\n");
- } else if (ast_taskprocessor_push(taskprocessor, t) < 0) {
- ast_log(LOG_ERROR, "failed to push a voicemail mwi subscribe task to taskprocessor\n");
- ast_task_free(t);
+ if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+ ast_free(mwist);
}
}
@@ -9649,7 +9639,7 @@
if (poll_thread != AST_PTHREADT_NULL)
stop_poll_thread();
- taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+ mwi_subscription_tps = ast_taskprocessor_unreference(mwi_subscription_tps);
return res;
}
@@ -9662,8 +9652,9 @@
/* compute the location of the voicemail spool directory */
snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
- /* this should probably go into load_config() */
- taskprocessor = ast_taskprocessor_get("app_voicemail", 0, 0);
+ if (!(mwi_subscription_tps = ast_taskprocessor_get("app_voicemail", 0))) {
+ ast_log(LOG_WARNING, "failed to reference mwi subscription taskprocessor. MWI will not work\n");
+ }
if ((res = load_config(0)))
return res;
Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Fri May 2 01:23:01 2008
@@ -1321,7 +1321,7 @@
struct sip_st_dlg *stimer; /*!< SIP Session-Timers */
};
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *sipsock_tps;
struct sip_handle_request_task {
struct sip_request req;
struct sockaddr_in sin;
@@ -19114,7 +19114,6 @@
static int sipsock_read(int *id, int fd, short events, void *ignore)
{
struct sip_handle_request_task *taskdata;
- void *task;
int res;
socklen_t len;
static char readbuf[65535];
@@ -19150,12 +19149,10 @@
taskdata->req.socket.port = bindaddr.sin_port;
taskdata->req.socket.lock = NULL;
- if (!(task = ast_task_alloc(handle_request_do, taskdata))) {
- ast_log(LOG_WARNING, "failed to allocate a task for SIP message\n");
- } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
- ast_log(LOG_WARNING, "failed to push a SIP message to taskprocessor\n");
- ast_task_free(task);
- }
+ if (ast_taskprocessor_push(sipsock_tps, handle_request_do, taskdata) < 0) {
+ ast_free(taskdata);
+ }
+
return 1;
}
@@ -22605,7 +22602,9 @@
ASTOBJ_CONTAINER_INIT(®l); /* Registry object list -- not searched for anything */
/* this is not the best place to put this */
- taskprocessor = ast_taskprocessor_get("sipsock", 0, 0);
+ if (!(sipsock_tps = ast_taskprocessor_get("sipsock", 0))) {
+ ast_log(LOG_WARNING, "failed to create sipsock taskprocessor. SIP is not going to work\n");
+ }
/* the fact that ao2_containers can't resize automatically is a major worry! */
/* if the number of objects gets above MAX_XXX_BUCKETS, things will slow down */
users = ao2_t_container_alloc(hash_user_size, user_hash_cb, user_cmp_cb, "allocate users");
@@ -22692,7 +22691,7 @@
struct ao2_iterator i;
ast_sched_dump(sched);
- taskprocessor = ast_taskprocessor_unreference(taskprocessor);
+ sipsock_tps = ast_taskprocessor_unreference(sipsock_tps);
/* First, take us out of the channel type list */
ast_channel_unregister(&sip_tech);
Modified: team/group/taskprocessors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/include/asterisk/taskprocessor.h?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/include/asterisk/taskprocessor.h (original)
+++ team/group/taskprocessors/include/asterisk/taskprocessor.h Fri May 2 01:23:01 2008
@@ -30,21 +30,21 @@
*
* \author Dwayne M. Hubbard <dhubbard at digium.com>
*
- * \note A taskprocessor is a singleton processing thread that can be referenced
- * with a friendly name and shared across modules. Modules that reference a taskprocessor
- * can push ast_task objects into the default taskprocessor FIFO container. The default
- * taskprocessing function can be overridden at the time of taskprocessor creation using
- * the ast_taskprocessor_get() function. Taskprocessors utilize astobj2 and will
- * be created upon the first reference to a taskprocessor name, as well as destroy
- * themselves when the taskprocessor reference count reaches zero.
+ * \note A taskprocessor is a named singleton containing a processing thread and
+ * a task queue that serializes tasks pushed into it by one or more independent modules.
+ * A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get()
+ * function and destroyed when the taskprocessor reference count reaches zero.
*
- * The ast_task structure has a callback function, called execute(), that is called by the
- * taskprocessor when the task is popped off the queue. The execute() callback handler
- * is responsible for processing the task data and freeing task data resources, if necessary
- * and then setting the task datap pointer to NULL before returning. The taskprocessor will
- * free the task structure after the task callback returns control to the taskprocessor function.
+ * Modules that obtain a reference to a taskprocessor can queue tasks into the taskprocessor
+ * where tasks are processed by the singleton processing thread when the task reaches the front
+ * of the queue. A task is a wrapper around a task-handling function pointer and a data
+ * pointer. It is the responsibility of the task handling function to free memory allocated for
+ * the task data pointer. A task is obtained using the ast_task_alloc() and freed by the
+ * taskprocessor after the task has been processed. A module releases its reference to a
+ * taskprocessor using the ast_taskprocessor_unreference() function. Tasks waiting to be
+ * processed in the taskprocessor queue when the taskprocessor reference count reaches zero
+ * will be purged and released from the taskprocessor queue without being processed.
*/
-struct ast_task;
struct ast_taskprocessor;
/*! \brief ast_tps_options is used to specify whether a taskprocessor should be created
@@ -58,37 +58,16 @@
TPS_REF_IF_EXISTS = (1 << 0),
};
-/*! \brief Initialize the taskprocessor subsystem */
-int ast_tps_init(void);
-
-/*! \brief Allocate and initialize a task structure.
- * \param task_exe The task handling function that will be called when this task is popped
- * off the taskprocessor queue and executed.
- * \param datap The data to pass to the task_exe function when the task is executed
- * \param src The calling function name (this will become DEBUG only)
- * \return A task structure ready to be queued into a taskprocessor, NULL on error */
-struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap);
-
-/*! \brief Free the task resources
- * \note This function does not free any memory pointed to by the data pointer. It is
- * the responsibility of the task handling function to free any resources managed by
- * a task's data pointer.
- * \param task The task to free
- * \return zero */
-int ast_task_free(struct ast_task *task);
-
/*! \brief Get a reference to a taskprocessor with the specified name; create a taskprocessor
- * if one with the specified name does not already exist. The default processing function
- * can be overridden using the second argument, custom_func, at the time of taskprocessor creation.
+ * if one with the specified name does not already exist.
* The default behavior of instantiating a taskprocessor if one does not already exist can be
* disabled by specifying the TPS_REF_IF_EXISTS reference type as the third argument to ast_taskprocessor_get().
* \param name The name of the taskprocessor
- * \param custom_func Use 0 by default or specify a custom taskprocessor thread function
* \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does
* not already exist
* return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the
* TPS_REF_IF_EXISTS reference type is specified */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create);
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create);
/*! \brief Unreference the specified taskprocessor and its reference count will decrement.
* taskprocessors use astobj2 and will destroy themselves when their reference count reaches zero.
@@ -100,18 +79,7 @@
* \param tps The taskprocessor structure
* \param t The task to push into the taskprocessor queue
* \return zero on success, -1 on failure */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t);
-
-/*! \brief If a task is in the taskprocessor queue it will be popped from the queue
- * and its address will be returned. This function returns NULL if the queue is empty.
- * \param tps The taskprocessor structure
- * \return A task pointer if the queue is not empty, NULL when the queue is empty. */
-struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps);
-
-/*! \brief Return the number of tasks waiting in the taskprocessor queue
- * \param tps taskprocessor to return queue depth
- * \return depth on success, -1 on error */
-int ast_taskprocessor_depth(struct ast_taskprocessor *tps);
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
/*! \brief Return the name of the taskprocessor singleton */
char * ast_taskprocessor_name(struct ast_taskprocessor *tps);
Modified: team/group/taskprocessors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/pbx.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/main/pbx.c (original)
+++ team/group/taskprocessors/main/pbx.c Fri May 2 01:23:01 2008
@@ -124,7 +124,7 @@
struct ast_context;
struct ast_app;
-static struct ast_taskprocessor *taskprocessor;
+static struct ast_taskprocessor *device_state_tps;
AST_THREADSTORAGE(switch_data);
@@ -8044,7 +8044,6 @@
{
const char *device;
struct statechange *sc;
- void *task;
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
if (ast_strlen_zero(device)) {
@@ -8055,11 +8054,8 @@
if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
return;
strcpy(sc->dev, device);
- if (!(task = ast_task_alloc(handle_statechange, sc))) {
- ast_log(LOG_WARNING, "failed to allocate a task for a device state change notification\n");
- } else if (ast_taskprocessor_push(taskprocessor, task) < 0) {
- ast_log(LOG_WARNING, "failed to push a device state change notification to taskprocessor\n");
- ast_task_free(task);
+ if (ast_taskprocessor_push(device_state_tps, handle_statechange, sc) < 0) {
+ ast_free(sc);
}
}
@@ -8069,7 +8065,9 @@
/* Initialize the PBX */
ast_verb(1, "Asterisk PBX Core Initializing\n");
- taskprocessor = ast_taskprocessor_get("pbx-core", 0, 0);
+ if (!(device_state_tps = ast_taskprocessor_get("pbx-core", 0))) {
+ ast_log(LOG_WARNING, "failed to create pbx-core taskprocessor\n");
+ }
ast_verb(1, "Registering builtin applications:\n");
ast_cli_register_multiple(pbx_cli, sizeof(pbx_cli) / sizeof(struct ast_cli_entry));
Modified: team/group/taskprocessors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/main/taskprocessor.c?view=diff&rev=115192&r1=115191&r2=115192
==============================================================================
--- team/group/taskprocessors/main/taskprocessor.c (original)
+++ team/group/taskprocessors/main/taskprocessor.c Fri May 2 01:23:01 2008
@@ -24,6 +24,7 @@
*/
#include "asterisk.h"
+#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
@@ -34,22 +35,22 @@
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-/*! \brief ast_task structure is queued to a taskprocessor and released
+/*! \brief tps_task structure is queued to a taskprocessor and released
* by the taskprocessing thread. The callback function that is assigned
* to the execute() function pointer is responsible for releasing
* datap resources if necessary. */
-struct ast_task {
+struct tps_task {
/*! \brief The execute() task callback function pointer */
int (*execute)(void *datap);
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
- AST_LIST_ENTRY(ast_task) list;
+ AST_LIST_ENTRY(tps_task) list;
};
-/*! \brief ast_taskprocessor_stats are technically optional, but
+/*! \brief tps_taskprocessor_stats are technically optional, but
* used by default to keep track of statistics for an individual taskprocessor */
-struct ast_taskprocessor_stats {
+struct tps_taskprocessor_stats {
/*! \brief This is the maximum number of tasks queued at any one time */
unsigned long max_qsize;
/*! \brief This is the current number of tasks processed */
@@ -69,14 +70,12 @@
ast_mutex_t taskprocessor_lock;
/*! \brief Taskprocesor thread run flag */
unsigned char poll_thread_run;
- /*! \brief Hold a pointer to the taskprocessing function */
- void *(*poll_function)(void*);
/*! \brief Taskprocessor statistics */
- struct ast_taskprocessor_stats *stats;
+ struct tps_taskprocessor_stats *stats;
/*! \brief Taskprocessor current queue size */
long queue_size;
/*! \brief Taskprocessor queue */
- AST_LIST_HEAD_NOLOCK(queue, ast_task) queue;
+ AST_LIST_HEAD_NOLOCK(queue, tps_task) queue;
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
};
@@ -91,9 +90,10 @@
static int tps_hash_cb(const void *obj, const int flags);
static int tps_cmp_cb(void *obj, void *arg, int flags);
static void *tps_default_processor_function(void *data);
-static struct ast_taskprocessor *tps_default_constructor(void);
static void tps_taskprocessor_destroy(void *tps);
-static int tps_taskprocessor_ping_handler(void *datap);
+static int tps_ping_handler(void *datap);
+static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
+static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -103,25 +103,19 @@
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
};
-/*
- * Taskprocessor Subsystem
- */
int ast_tps_init(void)
{
- tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb);
- if (!tps_singletons)
- return AST_MODULE_LOAD_FAILURE;
-
+ if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
+ ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
+ return -1;
+ }
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
return 0;
}
-/*
- * Task allocation & free
- */
-struct ast_task *ast_task_alloc(int (*task_exe)(void *datap), void *datap)
-{
- struct ast_task *t;
+static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
+{
+ struct tps_task *t;
if ((t = ast_calloc(1, sizeof(*t)))) {
t->execute = task_exe;
t->datap = datap;
@@ -129,12 +123,12 @@
return t;
}
-int ast_task_free(struct ast_task *task)
+static void *tps_task_free(struct tps_task *task)
{
if (task) {
ast_free(task);
}
- return 0;
+ return NULL;
}
/*
@@ -165,7 +159,7 @@
return name;
}
-static int tps_taskprocessor_ping_handler(void *datap)
+static int tps_ping_handler(void *datap)
{
ast_mutex_lock(&cli_ping_cond_lock);
ast_cond_signal(&cli_ping_cond);
@@ -179,7 +173,6 @@
char *name;
struct timeval tv;
struct timespec ts;
- struct ast_task *t = NULL;
struct ast_taskprocessor *tps = NULL;
switch (cmd) {
@@ -197,23 +190,17 @@
return CLI_SHOWUSAGE;
name = a->argv[2];
- if (!(tps = ast_taskprocessor_get(name, 0, TPS_REF_IF_EXISTS))) {
+ if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
return CLI_SUCCESS;
}
ast_cli(a->fd, "\npinging %s ...", name);
- if (!(t = ast_task_alloc(tps_taskprocessor_ping_handler, 0))) {
- ast_cli(a->fd, "\n\tfailed to allocate a task\n\n");
- ao2_ref(tps, -1);
- return CLI_FAILURE;
- }
tv = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
ast_mutex_lock(&cli_ping_cond_lock);
- if (ast_taskprocessor_push(tps, t) < 0) {
+ if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
- ast_task_free(t);
ao2_ref(tps, -1);
return CLI_FAILURE;
}
@@ -272,27 +259,26 @@
*/
static void *tps_default_processor_function(void *data)
{
- struct ast_taskprocessor *i;
- struct ast_task *t;
+ struct ast_taskprocessor *i = data;
+ struct tps_task *t;
int size;
- i = (struct ast_taskprocessor*)data;
if (!i) {
ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
return NULL;
}
while (i->poll_thread_run) {
- if ((size = ast_taskprocessor_depth(i)) > 0) {
+ if ((size = tps_taskprocessor_depth(i)) > 0) {
/* stuff is in the queue */
- t = ast_taskprocessor_pop(i);
+ t = tps_taskprocessor_pop(i);
if (!t) {
ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
continue;
}
if (!t->execute) {
ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
- ast_task_free(t);
+ tps_task_free(t);
continue;
}
t->execute(t->datap);
@@ -304,7 +290,7 @@
}
}
ast_mutex_unlock(&i->taskprocessor_lock);
- ast_task_free(t);
+ tps_task_free(t);
if (--size)
continue;
}
@@ -313,24 +299,21 @@
ast_mutex_unlock(&i->taskprocessor_lock);
break;
}
- if (!ast_taskprocessor_depth(i))
+ if (!tps_taskprocessor_depth(i))
ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
ast_mutex_unlock(&i->taskprocessor_lock);
}
- while (ast_taskprocessor_depth(i)) {
+ while (tps_taskprocessor_depth(i)) {
/* stuff is in the queue */
- t = ast_taskprocessor_pop(i);
+ t = tps_taskprocessor_pop(i);
if (t) {
- ast_task_free(t);
+ tps_task_free(t);
t = NULL;
}
}
return NULL;
}
-/*
- * hashing and comparison
- */
static int tps_hash_cb(const void *obj, const int flags)
{
const struct ast_taskprocessor *tps = obj;
@@ -343,25 +326,6 @@
struct ast_taskprocessor *lhs = obj, *rhs = arg;
return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH : 0;
-}
-
-/*
- * Taskprocessor construction and destruction
- */
-static struct ast_taskprocessor *tps_default_constructor(void)
-{
- struct ast_taskprocessor *tps;
-
- if (!(tps = ao2_alloc(sizeof(*tps), tps_taskprocessor_destroy))) {
- return NULL;
- }
- ast_cond_init(&tps->poll_cond, NULL);
- tps->poll_thread = AST_PTHREADT_NULL;
- tps->stats = ast_calloc(1, sizeof(*tps->stats));
- if (!tps->stats) {
- return ast_taskprocessor_unreference(tps);
- }
- return tps;
}
static void tps_taskprocessor_destroy(void *tps)
@@ -372,7 +336,7 @@
ast_log(LOG_ERROR, "missing taskprocessor\n");
return;
}
- ast_log(LOG_DEBUG, "destroying taskprocessor \'%s\'\n", t->name);
+ ast_log(LOG_DEBUG, "destroying taskprocessor '%s'\n", t->name);
/* kill it */
ast_mutex_lock(&t->taskprocessor_lock);
t->poll_thread_run = 0;
@@ -389,10 +353,32 @@
return;
}
+static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
+{
+ struct tps_task *task;
+
+ if (!tps) {
+ ast_log(LOG_ERROR, "missing taskprocessor\n");
+ return NULL;
+ }
+ ast_mutex_lock(&tps->taskprocessor_lock);
+ if ((task = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
+ tps->queue_size--;
+ }
+ ast_mutex_unlock(&tps->taskprocessor_lock);
+ return task;
+}
+
+static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+{
+ return (tps) ? tps->queue_size:-1;
+}
+
+
/*
* Taskprocessor service functions
*/
-char * ast_taskprocessor_name(struct ast_taskprocessor *tps)
+char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
{
if (!tps) {
ast_log(LOG_ERROR, "no taskprocessor specified!\n");
@@ -401,9 +387,8 @@
return tps->name;
}
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, void *(*custom_func)(void*), enum ast_tps_options create)
-{
- int rc;
+struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create)
+{
struct ast_taskprocessor *p, tmp_tps = {
.name = name,
};
@@ -415,52 +400,40 @@
ast_mutex_lock(&tps_marshall);
p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
if (p) {
- if ((create == TPS_REF_DEFAULT) && (p->poll_function != ((custom_func)?custom_func:tps_default_processor_function))) {
- ast_log(LOG_ERROR, "A taskprocessor \'%s\' already exists with a differing task processing function.\n", name);
- ao2_ref(p, -1);
- ast_mutex_unlock(&tps_marshall);
- return NULL;
- }
ast_mutex_unlock(&tps_marshall);
return p;
}
- if ((create & TPS_REF_IF_EXISTS) == TPS_REF_IF_EXISTS) {
+ if (create & TPS_REF_IF_EXISTS) {
/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
ast_mutex_unlock(&tps_marshall);
return NULL;
}
/* create a new taskprocessor */
- if ((p = tps_default_constructor()) == NULL) {
- ast_log(LOG_ERROR, "The default taskprocessor object could not be created for reference \'%s\'\n", name);
- ast_mutex_unlock(&tps_marshall);
+ if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
+ ast_mutex_unlock(&tps_marshall);
+ ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
+ return NULL;
+ }
+ if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
+ ast_mutex_unlock(&tps_marshall);
+ ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
+ ao2_ref(p, -1);
return NULL;
}
p->name = ast_strdup(name);
p->poll_thread_run = 1;
-
- /* compressing the 'if (custom_func)' blocks below into a single block, using something like:
- *
- * ast_pthread_create(&stuff, NULL, (custom_func)?eeep:mooo, p);
- *
- * will result in uglier and less useful 'core show threads' output and you won't know if the default
- * processing function was used or not. */
- if (custom_func) {
- p->poll_function = custom_func;
- rc = ast_pthread_create(&p->poll_thread, NULL, custom_func, p);
- } else {
- p->poll_function = tps_default_processor_function;
- rc = ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p);
- }
- if (rc < 0) {
- ast_log(LOG_ERROR, "Taskprocessor \'%s\' failed to create the processing thread.\n", p->name);
+ ast_cond_init(&p->poll_cond, NULL);
+ p->poll_thread = AST_PTHREADT_NULL;
+ if (ast_pthread_create(&p->poll_thread, NULL, tps_default_processor_function, p) < 0) {
+ ast_mutex_unlock(&tps_marshall);
+ ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
ao2_ref(p, -1);
- ast_mutex_unlock(&tps_marshall);
return NULL;
}
if (!(ao2_link(tps_singletons, p))) {
- ast_log(LOG_ERROR, "Failed to add taskprocessor \'%s\' to container\n", p->name);
+ ast_mutex_unlock(&tps_marshall);
+ ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
ao2_ref(p, -1);
- ast_mutex_unlock(&tps_marshall);
return NULL;
}
ast_mutex_unlock(&tps_marshall);
@@ -480,10 +453,16 @@
return NULL;
}
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, struct ast_task *t)
-{
- if (!tps || !t) {
- ast_log(LOG_ERROR, "missing \'%s\'\n", (tps)?"task":"taskprocessor");
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ struct tps_task *t;
+
+ if (!tps || !task_exe) {
+ ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback":"taskprocessor");
+ return -1;
+ }
+ if (!(t = tps_task_alloc(task_exe, datap))) {
+ ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
return -1;
}
ast_mutex_lock(&tps->taskprocessor_lock);
@@ -494,29 +473,3 @@
return 0;
}
-struct ast_task *ast_taskprocessor_pop(struct ast_taskprocessor *tps)
-{
- struct ast_task *t;
-
- if (!tps) {
- ast_log(LOG_ERROR, "missing taskprocessor\n");
- return NULL;
- }
- ast_mutex_lock(&tps->taskprocessor_lock);
- if ((t = AST_LIST_REMOVE_HEAD(&tps->queue, list))) {
- tps->queue_size--;
- }
- ast_mutex_unlock(&tps->taskprocessor_lock);
- return t;
-}
-
-int ast_taskprocessor_depth(struct ast_taskprocessor *tps)
-{
- int size = -1;
-
- if (tps) {
- size = tps->queue_size;
- }
- return size;
-}
-
More information about the asterisk-commits
mailing list