[Asterisk-code-review] taskprocessor: Merge ast taskprocessor and ast taskprocessor... (asterisk[master])
Corey Farrell
asteriskteam at digium.com
Mon Nov 12 05:54:37 CST 2018
Corey Farrell has uploaded this change for review. ( https://gerrit.asterisk.org/10619
Change subject: taskprocessor: Merge ast_taskprocessor and ast_taskprocessor_listener.
......................................................................
taskprocessor: Merge ast_taskprocessor and ast_taskprocessor_listener.
These objects are combined to eliminate the circular reference between
the two.
Change-Id: I105ad352cad14638009fac7bf1faab8580f1458e
---
M include/asterisk/taskprocessor.h
M main/taskprocessor.c
M main/threadpool.c
M tests/test_taskprocessor.c
4 files changed, 139 insertions(+), 211 deletions(-)
git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/19/10619/1
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..b1226fb 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -77,31 +77,29 @@
TPS_REF_IF_EXISTS = (1 << 0),
};
-struct ast_taskprocessor_listener;
-
-struct ast_taskprocessor_listener_callbacks {
+struct ast_taskprocessor_callbacks {
/*!
* \brief The taskprocessor has started completely
*
* This indicates that the taskprocessor is fully set up and the listener
* can now start interacting with it.
*
- * \param listener The listener to start
+ * \param tps The taskprocessor to start
*/
- int (*start)(struct ast_taskprocessor_listener *listener);
+ int (*start)(struct ast_taskprocessor *tps);
/*!
* \brief Indicates a task was pushed to the processor
*
- * \param listener The listener
+ * \param tps The taskprocessor to start
* \param was_empty If non-zero, the taskprocessor was empty prior to the task being pushed
*/
- void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
+ void (*task_pushed)(struct ast_taskprocessor *tps, int was_empty);
/*!
* \brief Indicates the task processor has become empty
*
- * \param listener The listener
+ * \param tps The taskprocessor to start
*/
- void (*emptied)(struct ast_taskprocessor_listener *listener);
+ void (*emptied)(struct ast_taskprocessor *tps);
/*!
* \brief Indicates the taskprocessor wishes to die.
*
@@ -112,44 +110,18 @@
* After this callback returns, it is NOT safe to operate on the
* listener's reference to the taskprocessor.
*
- * \param listener The listener
+ * \param tps The taskprocessor to start
*/
- void (*shutdown)(struct ast_taskprocessor_listener *listener);
- void (*dtor)(struct ast_taskprocessor_listener *listener);
+ void (*shutdown)(struct ast_taskprocessor *tps);
+ void (*dtor)(struct ast_taskprocessor *tps);
};
/*!
- * \brief Get a reference to the listener's taskprocessor
- *
- * This will return the taskprocessor with its reference count increased. Release
- * the reference to this object by using ast_taskprocessor_unreference()
- *
- * \param listener The listener that has the taskprocessor
- * \return The taskprocessor
- */
-struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener);
-
-/*!
* \brief Get the user data from the listener
- * \param listener The taskprocessor listener
- * \return The listener's user data
+ * \param tps The taskprocessor
+ * \return The taskprocessor's user data
*/
-void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener);
-
-/*!
- * \brief Allocate a taskprocessor listener
- *
- * \since 12.0.0
- *
- * This will result in the listener being allocated with the specified
- * callbacks.
- *
- * \param callbacks The callbacks to assign to the listener
- * \param user_data The user data for the listener
- * \retval NULL Failure
- * \retval non-NULL The newly allocated taskprocessor listener
- */
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data);
+void *ast_taskprocessor_get_user_data(const struct ast_taskprocessor *tps);
/*!
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
@@ -179,7 +151,8 @@
* \retval NULL Failure
* \reval non-NULL success
*/
-struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name,
+ const struct ast_taskprocessor_callbacks *callbacks, void *user_data);
/*!
* \brief Sets the local data associated with a taskprocessor.
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 2629eab..8d00475 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -80,6 +80,11 @@
long tps_queue_high;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
+ /*! The callbacks the taskprocessor calls into to notify of state changes */
+ const struct ast_taskprocessor_callbacks *callbacks;
+ /*! Data private to the listener */
+ void *user_data;
+
struct ast_taskprocessor_listener *listener;
/*! Current thread executing the tasks */
pthread_t thread;
@@ -104,12 +109,8 @@
* the module using the taskprocessor.
*/
struct ast_taskprocessor_listener {
- /*! The callbacks the taskprocessor calls into to notify of state changes */
- const struct ast_taskprocessor_listener_callbacks *callbacks;
/*! The taskprocessor that the listener is listening to */
struct ast_taskprocessor *tps;
- /*! Data private to the listener */
- void *user_data;
};
#ifdef LOW_MEMORY
@@ -157,24 +158,21 @@
ast_free(pvt);
}
-static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
+static void default_listener_pvt_dtor(struct ast_taskprocessor *tps)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ default_listener_pvt_destroy(tps->user_data);
- default_listener_pvt_destroy(pvt);
-
- listener->user_data = NULL;
+ tps->user_data = NULL;
}
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
-static void *default_tps_processing_function(void *data)
+static void *default_tps_processing_function(void *obj)
{
- struct ast_taskprocessor_listener *listener = data;
- struct ast_taskprocessor *tps = listener->tps;
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ struct ast_taskprocessor *tps = obj;
+ struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
int sem_value;
int res;
@@ -194,25 +192,25 @@
ast_assert(res == 0 && sem_value == 0);
/* Free the shutdown reference (see default_listener_shutdown) */
- ao2_t_ref(listener->tps, -1, "tps-shutdown");
+ ao2_t_ref(tps, -1, "tps-shutdown");
return NULL;
}
-static int default_listener_start(struct ast_taskprocessor_listener *listener)
+static int default_listener_start(struct ast_taskprocessor *tps)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
- if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
+ if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, tps)) {
return -1;
}
return 0;
}
-static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void default_task_pushed(struct ast_taskprocessor *tps, int was_empty)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
if (ast_sem_post(&pvt->sem) != 0) {
ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
@@ -227,15 +225,15 @@
return 0;
}
-static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
+static void default_listener_shutdown(struct ast_taskprocessor *tps)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
int res;
/* Hold a reference during shutdown */
- ao2_t_ref(listener->tps, +1, "tps-shutdown");
+ ao2_t_ref(tps, +1, "tps-shutdown");
- if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+ if (ast_taskprocessor_push(tps, default_listener_die, pvt)) {
/* This will cause the thread to exit early without completing tasks already
* in the queue. This is probably the least bad option in this situation. */
default_listener_die(pvt);
@@ -257,7 +255,7 @@
pvt->poll_thread = AST_PTHREADT_NULL;
}
-static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+static const struct ast_taskprocessor_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
@@ -642,8 +640,10 @@
ast_free((char *) t->name);
t->name = NULL;
- ao2_cleanup(t->listener);
- t->listener = NULL;
+
+ if (t->callbacks->dtor) {
+ t->callbacks->dtor(t);
+ }
}
/* pop the front task and return it */
@@ -676,44 +676,9 @@
return tps->name;
}
-static void listener_shutdown(struct ast_taskprocessor_listener *listener)
+void *ast_taskprocessor_get_user_data(const struct ast_taskprocessor *tps)
{
- listener->callbacks->shutdown(listener);
- ao2_ref(listener->tps, -1);
-}
-
-static void taskprocessor_listener_dtor(void *obj)
-{
- struct ast_taskprocessor_listener *listener = obj;
-
- if (listener->callbacks->dtor) {
- listener->callbacks->dtor(listener);
- }
-}
-
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
-{
- struct ast_taskprocessor_listener *listener;
-
- listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
- if (!listener) {
- return NULL;
- }
- listener->callbacks = callbacks;
- listener->user_data = user_data;
-
- return listener;
-}
-
-struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
-{
- ao2_ref(listener->tps, +1);
- return listener->tps;
-}
-
-void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
-{
- return listener->user_data;
+ return tps->user_data;
}
static void *default_listener_pvt_alloc(void)
@@ -733,16 +698,33 @@
return pvt;
}
-static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
+static void tps_fake_shutdown(const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
+{
+ struct ast_taskprocessor tps = {
+ .user_data = user_data
+ };
+
+ if (callbacks->dtor) {
+ callbacks->dtor(&tps);
+ }
+}
+
+static struct ast_taskprocessor *__allocate_taskprocessor(const char *name,
+ const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
{
struct ast_taskprocessor *p;
p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
+ tps_fake_shutdown(callbacks, user_data);
+
return NULL;
}
+ p->callbacks = callbacks;
+ p->user_data = user_data;
+
/* Set default congestion water level alert triggers. */
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
@@ -753,22 +735,15 @@
return NULL;
}
- ao2_ref(listener, +1);
- p->listener = listener;
-
p->thread = AST_PTHREADT_NULL;
- ao2_ref(p, +1);
- listener->tps = p;
-
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
- listener->tps = NULL;
- ao2_ref(p, -2);
+ ao2_ref(p, -1);
return NULL;
}
- if (p->listener->callbacks->start(p->listener)) {
+ if (p->callbacks->start(p)) {
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
p->name);
ast_taskprocessor_unreference(p);
@@ -784,7 +759,6 @@
struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
{
struct ast_taskprocessor *p;
- struct ast_taskprocessor_listener *listener;
struct default_taskprocessor_listener_pvt *pvt;
if (ast_strlen_zero(name)) {
@@ -799,36 +773,32 @@
/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
return NULL;
}
- /* Create a new taskprocessor. Start by creating a default listener */
+
pvt = default_listener_pvt_alloc();
if (!pvt) {
return NULL;
}
- listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
- if (!listener) {
- default_listener_pvt_destroy(pvt);
- return NULL;
- }
- p = __allocate_taskprocessor(name, listener);
-
- ao2_ref(listener, -1);
- return p;
+ return __allocate_taskprocessor(name, &default_listener_callbacks, pvt);
}
-struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name,
+ const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
{
struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
if (p) {
ast_taskprocessor_unreference(p);
+
+ tps_fake_shutdown(callbacks, user_data);
+
return NULL;
}
- return __allocate_taskprocessor(name, listener);
+
+ return __allocate_taskprocessor(name, callbacks, user_data);
}
-void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
- void *local_data)
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
{
SCOPED_AO2LOCK(lock, tps);
tps->local_data = local_data;
@@ -847,20 +817,23 @@
*/
ao2_lock(tps_singletons);
- if (ao2_ref(tps, -1) > 3) {
+ if (ao2_ref(tps, -1) > 2) {
ao2_unlock(tps_singletons);
return NULL;
}
- /* If we're down to 3 references, then those must be:
+ /* If we're down to 2 references, then those must be:
* 1. The reference we just got rid of
* 2. The container
- * 3. The listener
*/
- ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
+
+ /* Steal the container reference */
+ ao2_find(tps_singletons, tps, OBJ_UNLINK | OBJ_POINTER | OBJ_NOLOCK);
ao2_unlock(tps_singletons);
- listener_shutdown(tps->listener);
+ tps->callbacks->shutdown(tps);
+ ao2_ref(tps, -1);
+
return NULL;
}
@@ -897,7 +870,7 @@
/* The currently executing task counts as still in queue */
was_empty = tps->executing ? 0 : previous_size == 0;
ao2_unlock(tps);
- tps->listener->callbacks->task_pushed(tps->listener, was_empty);
+ tps->callbacks->task_pushed(tps, was_empty);
return 0;
}
@@ -986,8 +959,8 @@
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
- if (size == 0 && tps->listener->callbacks->emptied) {
- tps->listener->callbacks->emptied(tps->listener);
+ if (size == 0 && tps->callbacks->emptied) {
+ tps->callbacks->emptied(tps);
}
return size > 0;
}
diff --git a/main/threadpool.c b/main/threadpool.c
index 7729930..dc618f9 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -386,6 +386,10 @@
static void threadpool_destructor(void *obj)
{
struct ast_threadpool *pool = obj;
+
+ ao2_cleanup(pool->active_threads);
+ ao2_cleanup(pool->idle_threads);
+ ao2_cleanup(pool->zombie_threads);
ao2_cleanup(pool->listener);
}
@@ -438,7 +442,7 @@
return pool;
}
-static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
+static int threadpool_tps_start(struct ast_taskprocessor *tps)
{
return 0;
}
@@ -604,10 +608,9 @@
* \param listener The taskprocessor listener. The threadpool is the listener's private data
* \param was_empty True if the taskprocessor was empty prior to the task being pushed
*/
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
- int was_empty)
+static void threadpool_tps_task_pushed(struct ast_taskprocessor *tps, int was_empty)
{
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
struct task_pushed_data *tpd;
SCOPED_AO2LOCK(lock, pool);
@@ -648,9 +651,9 @@
* the threadpool no longer contains any tasks.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
-static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
+static void threadpool_tps_emptied(struct ast_taskprocessor *tps)
{
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
@@ -673,27 +676,31 @@
* in outright destroying the worker threads here.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
-static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
+static void threadpool_tps_shutdown(struct ast_taskprocessor *tps)
{
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
if (pool->listener && pool->listener->callbacks->shutdown) {
pool->listener->callbacks->shutdown(pool->listener);
}
- ao2_cleanup(pool->active_threads);
- ao2_cleanup(pool->idle_threads);
- ao2_cleanup(pool->zombie_threads);
+}
+
+static void threadpool_tps_dtor(struct ast_taskprocessor *tps)
+{
+ struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
+
ao2_cleanup(pool);
}
/*!
* \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
*/
-static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
+static struct ast_taskprocessor_callbacks threadpool_tps_listener_callbacks = {
.start = threadpool_tps_start,
.task_pushed = threadpool_tps_task_pushed,
.emptied = threadpool_tps_emptied,
.shutdown = threadpool_tps_shutdown,
+ .dtor = threadpool_tps_dtor,
};
/*!
@@ -913,37 +920,26 @@
struct ast_threadpool_listener *listener,
const struct ast_threadpool_options *options)
{
- struct ast_taskprocessor *tps;
- RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
- RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
-
- pool = threadpool_alloc(name, options);
- if (!pool) {
- return NULL;
- }
-
- tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
- if (!tps_listener) {
- return NULL;
- }
+ struct ast_threadpool *pool;
if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
return NULL;
}
- tps = ast_taskprocessor_create_with_listener(name, tps_listener);
- if (!tps) {
+ pool = threadpool_alloc(name, options);
+ if (!pool) {
return NULL;
}
- pool->tps = tps;
- if (listener) {
- ao2_ref(listener, +1);
- pool->listener = listener;
+ pool->tps = ast_taskprocessor_create_with_listener(name, &threadpool_tps_listener_callbacks, pool);
+ if (!pool->tps) {
+ return NULL;
}
+
+ pool->listener = ao2_bump(listener);
ast_threadpool_set_size(pool, pool->options.initial_size);
- ao2_ref(pool, +1);
+
return pool;
}
@@ -1313,7 +1309,7 @@
struct ast_serializer_shutdown_group *shutdown_group;
};
-static void serializer_dtor(void *obj)
+static void serializer_destructor(void *obj)
{
struct serializer *ser = obj;
@@ -1328,7 +1324,7 @@
{
struct serializer *ser;
- ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ ser = ao2_alloc_options(sizeof(*ser), serializer_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!ser) {
return NULL;
}
@@ -1354,38 +1350,44 @@
return 0;
}
-static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void serializer_task_pushed(struct ast_taskprocessor *tps, int was_empty)
{
if (was_empty) {
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
- struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+ struct serializer *ser = ast_taskprocessor_get_user_data(tps);
- if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
+ if (ast_threadpool_push(ser->pool, execute_tasks, ao2_bump(tps))) {
ast_taskprocessor_unreference(tps);
}
}
}
-static int serializer_start(struct ast_taskprocessor_listener *listener)
+static int serializer_start(struct ast_taskprocessor *tps)
{
/* No-op */
return 0;
}
-static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+static void serializer_shutdown(struct ast_taskprocessor *tps)
{
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct serializer *ser = ast_taskprocessor_get_user_data(tps);
if (ser->shutdown_group) {
serializer_shutdown_group_dec(ser->shutdown_group);
}
+}
+
+static void serializer_dtor(struct ast_taskprocessor *tps)
+{
+ struct serializer *ser = ast_taskprocessor_get_user_data(tps);
+
ao2_cleanup(ser);
}
-static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+static struct ast_taskprocessor_callbacks serializer_tps_listener_callbacks = {
.task_pushed = serializer_task_pushed,
.start = serializer_start,
.shutdown = serializer_shutdown,
+ .dtor = serializer_dtor,
};
struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
@@ -1397,7 +1399,6 @@
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
- struct ast_taskprocessor_listener *listener;
struct ast_taskprocessor *tps;
ser = serializer_create(pool, shutdown_group);
@@ -1405,21 +1406,11 @@
return NULL;
}
- listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
- if (!listener) {
- ao2_ref(ser, -1);
- return NULL;
- }
-
- tps = ast_taskprocessor_create_with_listener(name, listener);
- if (!tps) {
- /* ser ref transferred to listener but not cleaned without tps */
- ao2_ref(ser, -1);
- } else if (shutdown_group) {
+ tps = ast_taskprocessor_create_with_listener(name, &serializer_tps_listener_callbacks, ser);
+ if (tps && shutdown_group) {
serializer_shutdown_group_inc(shutdown_group);
}
- ao2_ref(listener, -1);
return tps;
}
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 6428746..eaee0dd 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -309,7 +309,7 @@
/*!
* \brief test taskprocessor listener's start callback
*/
-static int test_start(struct ast_taskprocessor_listener *listener)
+static int test_start(struct ast_taskprocessor *tps)
{
return 0;
}
@@ -319,9 +319,9 @@
*
* Adjusts private data's stats as indicated by the parameters.
*/
-static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void test_task_pushed(struct ast_taskprocessor *tps, int was_empty)
{
- struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+ struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
++pvt->num_pushed;
if (was_empty) {
++pvt->num_was_empty;
@@ -331,22 +331,22 @@
/*!
* \brief test taskprocessor listener's emptied callback.
*/
-static void test_emptied(struct ast_taskprocessor_listener *listener)
+static void test_emptied(struct ast_taskprocessor *tps)
{
- struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+ struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
++pvt->num_emptied;
}
/*!
* \brief test taskprocessor listener's shutdown callback.
*/
-static void test_shutdown(struct ast_taskprocessor_listener *listener)
+static void test_shutdown(struct ast_taskprocessor *tps)
{
- struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+ struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
pvt->shutdown = 1;
}
-static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
+static const struct ast_taskprocessor_callbacks test_callbacks = {
.start = test_start,
.task_pushed = test_task_pushed,
.emptied = test_emptied,
@@ -409,7 +409,6 @@
AST_TEST_DEFINE(taskprocessor_listener)
{
struct ast_taskprocessor *tps = NULL;
- struct ast_taskprocessor_listener *listener = NULL;
struct test_listener_pvt *pvt = NULL;
enum ast_test_result_state res = AST_TEST_PASS;
@@ -431,14 +430,7 @@
return AST_TEST_FAIL;
}
- listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
- if (!listener) {
- ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
- res = AST_TEST_FAIL;
- goto test_exit;
- }
-
- tps = ast_taskprocessor_create_with_listener("test_listener", listener);
+ tps = ast_taskprocessor_create_with_listener("test_listener", &test_callbacks, pvt);
if (!tps) {
ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
res = AST_TEST_FAIL;
@@ -489,7 +481,6 @@
}
test_exit:
- ao2_cleanup(listener);
/* This is safe even if tps is NULL */
ast_taskprocessor_unreference(tps);
ast_free(pvt);
--
To view, visit https://gerrit.asterisk.org/10619
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I105ad352cad14638009fac7bf1faab8580f1458e
Gerrit-Change-Number: 10619
Gerrit-PatchSet: 1
Gerrit-Owner: Corey Farrell <git at cfware.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181112/21b3ab24/attachment-0001.html>
More information about the asterisk-code-review
mailing list