[asterisk-commits] russell: branch russell/sched_thread r143120 - in /team/russell/sched_thread:...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sun Sep 14 19:07:38 CDT 2008
Author: russell
Date: Sun Sep 14 19:07:38 2008
New Revision: 143120
URL: http://svn.digium.com/view/asterisk?view=rev&rev=143120
Log:
Merge some changes to the taskprocessor API ...
- s/taskprocessor/taskproc/ because I'm lazy
- s/unreference/unref/ again, because I'm lazy
- Add some new stuff - ast_taskproc_pool
- This allows you to create a pool of task processors.
- When you queue tasks up to a pool of task processors, they are currently
distributed via the processors round robin. However, I plan to add a way
that lets you customize distribution logic.
Modified:
team/russell/sched_thread/apps/app_queue.c
team/russell/sched_thread/apps/app_voicemail.c
team/russell/sched_thread/include/asterisk/taskprocessor.h
team/russell/sched_thread/main/event.c
team/russell/sched_thread/main/pbx.c
team/russell/sched_thread/main/taskprocessor.c
Modified: team/russell/sched_thread/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/apps/app_queue.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/apps/app_queue.c (original)
+++ team/russell/sched_thread/apps/app_queue.c Sun Sep 14 19:07:38 2008
@@ -132,7 +132,7 @@
{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
};
-static struct ast_taskprocessor *devicestate_tps;
+static struct ast_taskproc *devicestate_tps;
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
@@ -778,7 +778,7 @@
}
sc->state = state;
strcpy(sc->dev, device);
- if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
+ if (ast_taskproc_push(devicestate_tps, handle_statechange, sc) < 0) {
ast_free(sc);
}
}
@@ -6367,7 +6367,7 @@
queue_unref(q);
}
ao2_ref(queues, -1);
- devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
+ devicestate_tps = ast_taskproc_unref(devicestate_tps);
ast_unload_realtime("queue_members");
return res;
}
@@ -6414,7 +6414,7 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
+ if (!(devicestate_tps = ast_taskproc_get("app_queue", 0))) {
ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
}
Modified: team/russell/sched_thread/apps/app_voicemail.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/apps/app_voicemail.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/apps/app_voicemail.c (original)
+++ team/russell/sched_thread/apps/app_voicemail.c Sun Sep 14 19:07:38 2008
@@ -639,7 +639,7 @@
uint32_t uniqueid;
};
-static struct ast_taskprocessor *mwi_subscription_tps;
+static struct ast_taskproc *mwi_subscription_tps;
static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
@@ -9746,7 +9746,7 @@
u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
*uniqueid = u;
- if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
+ if (ast_taskproc_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
ast_free(uniqueid);
}
}
@@ -9769,7 +9769,7 @@
mwist->context = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT));
mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
- if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+ if (ast_taskproc_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
ast_free(mwist);
}
}
@@ -10651,7 +10651,7 @@
if (poll_thread != AST_PTHREADT_NULL)
stop_poll_thread();
- mwi_subscription_tps = ast_taskprocessor_unreference(mwi_subscription_tps);
+ mwi_subscription_tps = ast_taskproc_unref(mwi_subscription_tps);
ast_unload_realtime("voicemail");
ast_unload_realtime("voicemail_data");
@@ -10669,7 +10669,7 @@
/* compute the location of the voicemail spool directory */
snprintf(VM_SPOOL_DIR, sizeof(VM_SPOOL_DIR), "%s/voicemail/", ast_config_AST_SPOOL_DIR);
- if (!(mwi_subscription_tps = ast_taskprocessor_get("app_voicemail", 0))) {
+ if (!(mwi_subscription_tps = ast_taskproc_get("app_voicemail", 0))) {
ast_log(AST_LOG_WARNING, "failed to reference mwi subscription taskprocessor. MWI will not work\n");
}
Modified: team/russell/sched_thread/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/include/asterisk/taskprocessor.h?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/include/asterisk/taskprocessor.h (original)
+++ team/russell/sched_thread/include/asterisk/taskprocessor.h Sun Sep 14 19:07:38 2008
@@ -1,9 +1,10 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 2007-2008, Dwayne M. Hubbard
+ * Copyright (C) 2007 - 2008, Digium, Inc.
*
* Dwayne M. Hubbard <dhubbard at digium.com>
+ * Russell Bryant <russell at digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
@@ -15,24 +16,25 @@
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
-#include "asterisk.h"
+
+#ifndef __AST_TASKPROC_H__
+#define __AST_TASKPROC_H__
+
#include "asterisk/lock.h"
#include "asterisk/linkedlists.h"
#include "asterisk/utils.h"
#include "asterisk/options.h"
-#ifndef __taskprocessor_h__
-#define __taskprocessor_h__
-
/*!
- * \file taskprocessor.h
+ * \file
* \brief An API for managing task processing threads that can be shared across modules
*
* \author Dwayne M. Hubbard <dhubbard at digium.com>
+ * \author Russell Bryant <russell at digium.com>
*
* \note A taskprocessor is a named singleton containing a processing thread and
* a task queue that serializes tasks pushed into it by [a] module(s) that reference the taskprocessor.
- * A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get()
+ * A taskprocessor is created the first time its name is requested via the ast_taskproc_get()
* function and destroyed when the taskprocessor reference count reaches zero.
*
* Modules that obtain a reference to a taskprocessor can queue tasks into the taskprocessor
@@ -40,21 +42,21 @@
* of the queue. A task is a wrapper around a task-handling function pointer and a data
* pointer. It is the responsibility of the task handling function to free memory allocated for
* the task data pointer. A task is pushed into a taskprocessor queue using the
- * ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the
+ * ast_taskproc_push(taskprocessor, taskhandler, taskdata) function and freed by the
* taskprocessor after the task handling function returns. A module releases its reference to a
- * taskprocessor using the ast_taskprocessor_unreference() function which may result in the
+ * taskprocessor using the ast_taskproc_unref() function which may result in the
* destruction of the taskprocessor if the taskprocessor's reference count reaches zero. Tasks waiting
* to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero
* will be purged and released from the taskprocessor queue without being processed.
*/
-struct ast_taskprocessor;
+struct ast_taskproc;
/*! \brief ast_tps_options for specification of taskprocessor options
*
- * Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor
+ * Specify whether a taskprocessor should be created via ast_taskproc_get() if the taskprocessor
* does not already exist. The default behavior is to create a taskprocessor if it does not already exist
* and provide its reference to the calling function. To only return a reference to a taskprocessor if
- * and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskprocessor_get(). */
+ * and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskproc_get(). */
enum ast_tps_options {
/*! \brief return a reference to a taskprocessor, create one if it does not exist */
TPS_REF_DEFAULT = 0,
@@ -65,13 +67,13 @@
/*! \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
*
* The default behavior of instantiating a taskprocessor if one does not already exist can be
- * disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get().
+ * disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskproc_get().
* \param name The name of the taskprocessor
* \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does
* not already exist
* return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the
* TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create);
+struct ast_taskproc *ast_taskproc_get(char *name, enum ast_tps_options create);
/*! \brief Unreference the specified taskprocessor and its reference count will decrement.
*
@@ -79,16 +81,38 @@
* themself when the taskprocessor reference count reaches zero.
* \param tps taskprocessor to unreference
* \return NULL */
-void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
+void *ast_taskproc_unref(struct ast_taskproc *tps);
+
+/*! \brief a task handler */
+typedef int (*ast_taskproc_cb)(void *datap);
/*! \brief Push a task into the specified taskprocessor queue and signal the taskprocessor thread
* \param tps The taskprocessor structure
* \param task_exe The task handling function to push into the taskprocessor queue
* \param datap The data to be used by the task handling function
* \return zero on success, -1 on failure */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+int ast_taskproc_push(struct ast_taskproc *tps, ast_taskproc_cb cb, void *datap);
-/*! \brief Return the name of the taskprocessor singleton */
-const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
-#endif
+/*!
+ * \brief Return the name of the taskprocessor singleton
+ *
+ * \note The return value is only valid as long as the calling code still holds
+ * a valid reference to the task processor.
+ */
+const char *ast_taskproc_name(struct ast_taskproc *tps);
+/*! \brief An opaque type for a pool of task processors */
+struct ast_taskproc_pool;
+
+struct ast_taskproc_pool *ast_taskproc_pool_create(const char *name_prefix);
+
+unsigned int ast_taskproc_pool_set_size(struct ast_taskproc_pool *p,
+ unsigned int size);
+
+struct ast_taskproc_pool *ast_taskproc_pool_destroy(struct ast_taskproc_pool *p);
+
+int ast_taskproc_pool_push(struct ast_taskproc_pool *p, ast_taskproc_cb cb,
+ void *data);
+
+#endif /* __AST_TASKPROC_H__ */
+
Modified: team/russell/sched_thread/main/event.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/event.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/event.c (original)
+++ team/russell/sched_thread/main/event.c Sun Sep 14 19:07:38 2008
@@ -37,7 +37,7 @@
#include "asterisk/utils.h"
#include "asterisk/taskprocessor.h"
-struct ast_taskprocessor *event_dispatcher;
+struct ast_taskproc *event_dispatcher;
/*!
* \brief An event information element
@@ -1070,7 +1070,7 @@
event_ref->event = event;
- return ast_taskprocessor_push(event_dispatcher, handle_event, event_ref);
+ return ast_taskproc_push(event_dispatcher, handle_event, event_ref);
}
void ast_event_init(void)
@@ -1083,5 +1083,5 @@
for (i = 0; i < AST_EVENT_TOTAL; i++)
AST_RWLIST_HEAD_INIT(&ast_event_cache[i]);
- event_dispatcher = ast_taskprocessor_get("core_event_dispatcher", 0);
-}
+ event_dispatcher = ast_taskproc_get("core_event_dispatcher", 0);
+}
Modified: team/russell/sched_thread/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/pbx.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/pbx.c (original)
+++ team/russell/sched_thread/main/pbx.c Sun Sep 14 19:07:38 2008
@@ -124,7 +124,7 @@
struct ast_context;
struct ast_app;
-static struct ast_taskprocessor *device_state_tps;
+static struct ast_taskproc *device_state_tps;
AST_THREADSTORAGE(switch_data);
@@ -8410,7 +8410,7 @@
if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
return;
strcpy(sc->dev, device);
- if (ast_taskprocessor_push(device_state_tps, handle_statechange, sc) < 0) {
+ if (ast_taskproc_push(device_state_tps, handle_statechange, sc) < 0) {
ast_free(sc);
}
}
@@ -8421,7 +8421,7 @@
/* Initialize the PBX */
ast_verb(1, "Asterisk PBX Core Initializing\n");
- if (!(device_state_tps = ast_taskprocessor_get("pbx-core", 0))) {
+ if (!(device_state_tps = ast_taskproc_get("pbx-core", 0))) {
ast_log(LOG_WARNING, "failed to create pbx-core taskprocessor\n");
}
Modified: team/russell/sched_thread/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/main/taskprocessor.c?view=diff&rev=143120&r1=143119&r2=143120
==============================================================================
--- team/russell/sched_thread/main/taskprocessor.c (original)
+++ team/russell/sched_thread/main/taskprocessor.c Sun Sep 14 19:07:38 2008
@@ -1,9 +1,10 @@
/*
* Asterisk -- An open source telephony toolkit.
*
- * Copyright (C) 2007-2008, Dwayne M. Hubbard
+ * Copyright (C) 2007 - 2008, Digium, Inc.
*
* Dwayne M. Hubbard <dhubbard at digium.com>
+ * Russell Bryant <russell at digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
@@ -15,11 +16,13 @@
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
+
/*! \file
*
* \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
*
* \author Dwayne Hubbard <dhubbard at digium.com>
+ * \author Russell Bryant <russell at digium.com>
*/
#include "asterisk.h"
@@ -51,16 +54,16 @@
AST_LIST_ENTRY(tps_task) list;
};
-/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
-struct tps_taskprocessor_stats {
+/*! \brief tps_taskproc_stats maintain statistics for a taskprocessor. */
+struct tps_taskproc_stats {
/*! \brief This is the maximum number of tasks queued at any one time */
unsigned long max_qsize;
/*! \brief This is the current number of tasks processed */
unsigned long _tasks_processed_count;
};
-/*! \brief A ast_taskprocessor structure is a singleton by name */
-struct ast_taskprocessor {
+/*! \brief A ast_taskproc structure is a singleton by name */
+struct ast_taskproc {
/*! \brief Friendly name of the taskprocessor */
char *name;
/*! \brief Thread poll condition */
@@ -72,18 +75,24 @@
/*! \brief Taskprocesor thread run flag */
unsigned char poll_thread_run;
/*! \brief Taskprocessor statistics */
- struct tps_taskprocessor_stats *stats;
+ struct tps_taskproc_stats *stats;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
/*! \brief Taskprocessor singleton list entry */
- AST_LIST_ENTRY(ast_taskprocessor) list;
+ AST_LIST_ENTRY(ast_taskproc) list;
};
#define TPS_MAX_BUCKETS 7
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
static struct ao2_container *tps_singletons;
+struct ast_taskproc_pool {
+ struct ao2_container *tps;
+ struct ao2_iterator iter;
+ char name_prefix[1];
+};
+
/*! \brief CLI 'taskprocessor ping <blah>' operation requires a ping condition */
static ast_cond_t cli_ping_cond;
@@ -99,16 +108,16 @@
static void *tps_processing_function(void *data);
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
+static void tps_taskproc_destroy(void *tps);
/*! \brief CLI 'taskprocessor ping <blah>' handler function */
static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
+static struct tps_task *tps_taskproc_pop(struct ast_taskproc *tps);
/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
+static int tps_taskproc_depth(struct ast_taskproc *tps);
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -153,7 +162,7 @@
}
/* taskprocessor tab completion */
-static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
+static char *tps_taskproc_tab_complete(struct ast_taskproc *p, struct ast_cli_args *a)
{
int tklen;
int wordnum = 0;
@@ -192,7 +201,7 @@
char *name;
struct timeval when;
struct timespec ts;
- struct ast_taskprocessor *tps = NULL;
+ struct ast_taskproc *tps = NULL;
switch (cmd) {
case CLI_INIT:
@@ -202,14 +211,14 @@
" Displays the time required for a task to be processed\n";
return NULL;
case CLI_GENERATE:
- return tps_taskprocessor_tab_complete(tps, a);
+ return tps_taskproc_tab_complete(tps, a);
}
if (a->argc != 4)
return CLI_SHOWUSAGE;
name = a->argv[3];
- if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
+ if (!(tps = ast_taskproc_get(name, TPS_REF_IF_EXISTS))) {
ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
return CLI_SUCCESS;
}
@@ -218,7 +227,7 @@
ts.tv_sec = when.tv_sec;
ts.tv_nsec = when.tv_usec * 1000;
ast_mutex_lock(&cli_ping_cond_lock);
- if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
+ if (ast_taskproc_push(tps, tps_ping_handler, 0) < 0) {
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
ao2_ref(tps, -1);
return CLI_FAILURE;
@@ -239,7 +248,7 @@
unsigned long qsize;
unsigned long maxqsize;
unsigned long processed;
- struct ast_taskprocessor *p;
+ struct ast_taskproc *p;
struct ao2_iterator i;
switch (cmd) {
@@ -274,12 +283,12 @@
/* this is the task processing worker function */
static void *tps_processing_function(void *data)
{
- struct ast_taskprocessor *i = data;
+ struct ast_taskproc *i = data;
struct tps_task *t;
int size;
if (!i) {
- ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
+ ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskproc structure.\n");
return NULL;
}
@@ -289,7 +298,7 @@
ast_mutex_unlock(&i->taskprocessor_lock);
break;
}
- if (!(size = tps_taskprocessor_depth(i))) {
+ if (!(size = tps_taskproc_depth(i))) {
ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
if (!i->poll_thread_run) {
ast_mutex_unlock(&i->taskprocessor_lock);
@@ -298,7 +307,7 @@
}
ast_mutex_unlock(&i->taskprocessor_lock);
/* stuff is in the queue */
- if (!(t = tps_taskprocessor_pop(i))) {
+ if (!(t = tps_taskproc_pop(i))) {
ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
continue;
}
@@ -320,7 +329,7 @@
tps_task_free(t);
}
- while ((t = tps_taskprocessor_pop(i))) {
+ while ((t = tps_taskproc_pop(i))) {
tps_task_free(t);
}
return NULL;
@@ -329,7 +338,7 @@
/* hash callback for astobj2 */
static int tps_hash_cb(const void *obj, const int flags)
{
- const struct ast_taskprocessor *tps = obj;
+ const struct ast_taskproc *tps = obj;
return ast_str_hash(tps->name);
}
@@ -337,15 +346,15 @@
/* compare callback for astobj2 */
static int tps_cmp_cb(void *obj, void *arg, int flags)
{
- struct ast_taskprocessor *lhs = obj, *rhs = arg;
+ struct ast_taskproc *lhs = obj, *rhs = arg;
return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
}
/* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
-{
- struct ast_taskprocessor *t = tps;
+static void tps_taskproc_destroy(void *tps)
+{
+ struct ast_taskproc *t = tps;
if (!tps) {
ast_log(LOG_ERROR, "missing taskprocessor\n");
@@ -370,7 +379,7 @@
}
/* pop the front task and return it */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
+static struct tps_task *tps_taskproc_pop(struct ast_taskproc *tps)
{
struct tps_task *task;
@@ -386,13 +395,13 @@
return task;
}
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+static int tps_taskproc_depth(struct ast_taskproc *tps)
{
return (tps) ? tps->tps_queue_size : -1;
}
/* taskprocessor name accessor */
-const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
+const char *ast_taskproc_name(struct ast_taskproc *tps)
{
if (!tps) {
ast_log(LOG_ERROR, "no taskprocessor specified!\n");
@@ -404,9 +413,9 @@
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
-struct ast_taskprocessor *ast_taskprocessor_get(char *name, enum ast_tps_options create)
-{
- struct ast_taskprocessor *p, tmp_tps = {
+struct ast_taskproc *ast_taskproc_get(char *name, enum ast_tps_options create)
+{
+ struct ast_taskproc *p, tmp_tps = {
.name = name,
};
@@ -426,7 +435,7 @@
return NULL;
}
/* create a new taskprocessor */
- if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
+ if (!(p = ao2_alloc(sizeof(*p), tps_taskproc_destroy))) {
ao2_unlock(tps_singletons);
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
@@ -465,7 +474,7 @@
}
/* decrement the taskprocessor reference count and unlink from the container if necessary */
-void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
+void *ast_taskproc_unref(struct ast_taskproc *tps)
{
if (tps) {
ao2_lock(tps_singletons);
@@ -479,7 +488,7 @@
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+int ast_taskproc_push(struct ast_taskproc *tps, int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
@@ -499,3 +508,104 @@
return 0;
}
+struct ast_taskproc_pool *ast_taskproc_pool_create(const char *name_prefix)
+{
+ struct ast_taskproc_pool *p;
+
+ if (!(p = ast_calloc(1, sizeof(*p) * strlen(name_prefix)))) {
+ return NULL;
+ }
+
+ if (!(p->tps = ao2_container_alloc(1, tps_hash_cb, tps_cmp_cb))) {
+ ast_free(p);
+ return NULL;
+ }
+
+ p->iter = ao2_iterator_init(p->tps, 0);
+
+ strcpy(p->name_prefix, name_prefix);
+
+ return p;
+}
+
+static int cb_true(void *obj, void *arg, int flags)
+{
+ return CMP_MATCH;
+}
+
+unsigned int ast_taskproc_pool_set_size(struct ast_taskproc_pool *p,
+ unsigned int size)
+{
+ ao2_lock(p->tps);
+
+ /* Handle downsizing */
+
+ while (ao2_container_count(p->tps) > size) {
+ struct ast_taskproc *tp;
+
+ if (!(tp = ao2_callback(p->tps, OBJ_UNLINK, cb_true, NULL))) {
+ ast_log(LOG_ERROR, "taskprocessor unlink fail!!11\n");
+ continue;
+ }
+
+ ast_taskproc_unref(tp);
+ }
+
+ /* Now handle an increase in size */
+
+ while (ao2_container_count(p->tps) < size) {
+ struct ast_taskproc *tp;
+ char buf[128];
+
+ /* Create a unique name for this thingabobber */
+
+ snprintf(buf, sizeof(buf), "%s-%lu", p->name_prefix, ast_random());
+
+ if (!(tp = ast_taskproc_get(buf, TPS_REF_DEFAULT))) {
+ break;
+ }
+
+ ao2_link(p->tps, tp);
+
+ ast_taskproc_unref(tp);
+ }
+
+ ao2_unlock(p->tps);
+
+ return ao2_container_count(p->tps);
+}
+
+struct ast_taskproc_pool *ast_taskproc_pool_destroy(struct ast_taskproc_pool *p)
+{
+ ao2_ref(p->tps, -1);
+ p->tps = NULL;
+
+ ast_free(p);
+
+ return NULL;
+}
+
+int ast_taskproc_pool_push(struct ast_taskproc_pool *p, ast_taskproc_cb cb,
+ void *data)
+{
+ struct ast_taskproc *tp;
+
+ ao2_lock(p->tps);
+
+ if (!(tp = ao2_iterator_next(&p->iter))) {
+ p->iter = ao2_iterator_init(p->tps, 0);
+ if (!(tp = ao2_iterator_next(&p->iter))) {
+ ast_log(LOG_ERROR, "Failed to push task into pool\n");
+ ao2_unlock(p->tps);
+ return -1;
+ }
+ }
+
+ ast_taskproc_push(tp, cb, data);
+
+ ao2_unlock(p->tps);
+
+ tp = ast_taskproc_unref(tp);
+
+ return 0;
+}
More information about the asterisk-commits
mailing list