[Asterisk-code-review] taskprocessor: Merge ast taskprocessor and ast taskprocessor... (asterisk[master])

Corey Farrell asteriskteam at digium.com
Mon Nov 12 05:54:37 CST 2018


Corey Farrell has uploaded this change for review. ( https://gerrit.asterisk.org/10619


Change subject: taskprocessor: Merge ast_taskprocessor and ast_taskprocessor_listener.
......................................................................

taskprocessor: Merge ast_taskprocessor and ast_taskprocessor_listener.

These objects are combined to eliminate the circular reference between
the two.

Change-Id: I105ad352cad14638009fac7bf1faab8580f1458e
---
M include/asterisk/taskprocessor.h
M main/taskprocessor.c
M main/threadpool.c
M tests/test_taskprocessor.c
4 files changed, 139 insertions(+), 211 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/19/10619/1

diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f74989a..b1226fb 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -77,31 +77,29 @@
 	TPS_REF_IF_EXISTS = (1 << 0),
 };
 
-struct ast_taskprocessor_listener;
-
-struct ast_taskprocessor_listener_callbacks {
+struct ast_taskprocessor_callbacks {
 	/*!
 	 * \brief The taskprocessor has started completely
 	 *
 	 * This indicates that the taskprocessor is fully set up and the listener
 	 * can now start interacting with it.
 	 *
-	 * \param listener The listener to start
+	 * \param tps The taskprocessor to start
 	 */
-	int (*start)(struct ast_taskprocessor_listener *listener);
+	int (*start)(struct ast_taskprocessor *tps);
 	/*!
 	 * \brief Indicates a task was pushed to the processor
 	 *
-	 * \param listener The listener
+	 * \param tps The taskprocessor to start
 	 * \param was_empty If non-zero, the taskprocessor was empty prior to the task being pushed
 	 */
-	void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
+	void (*task_pushed)(struct ast_taskprocessor *tps, int was_empty);
 	/*!
 	 * \brief Indicates the task processor has become empty
 	 *
-	 * \param listener The listener
+	 * \param tps The taskprocessor to start
 	 */
-	void (*emptied)(struct ast_taskprocessor_listener *listener);
+	void (*emptied)(struct ast_taskprocessor *tps);
 	/*!
 	 * \brief Indicates the taskprocessor wishes to die.
 	 *
@@ -112,44 +110,18 @@
 	 * After this callback returns, it is NOT safe to operate on the
 	 * listener's reference to the taskprocessor.
 	 *
-	 * \param listener The listener
+	 * \param tps The taskprocessor to start
 	 */
-	void (*shutdown)(struct ast_taskprocessor_listener *listener);
-	void (*dtor)(struct ast_taskprocessor_listener *listener);
+	void (*shutdown)(struct ast_taskprocessor *tps);
+	void (*dtor)(struct ast_taskprocessor *tps);
 };
 
 /*!
- * \brief Get a reference to the listener's taskprocessor
- *
- * This will return the taskprocessor with its reference count increased. Release
- * the reference to this object by using ast_taskprocessor_unreference()
- *
- * \param listener The listener that has the taskprocessor
- * \return The taskprocessor
- */
-struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener);
-
-/*!
  * \brief Get the user data from the listener
- * \param listener The taskprocessor listener
- * \return The listener's user data
+ * \param tps The taskprocessor
+ * \return The taskprocessor's user data
  */
-void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener);
-
-/*!
- * \brief Allocate a taskprocessor listener
- *
- * \since 12.0.0
- *
- * This will result in the listener being allocated with the specified
- * callbacks.
- *
- * \param callbacks The callbacks to assign to the listener
- * \param user_data The user data for the listener
- * \retval NULL Failure
- * \retval non-NULL The newly allocated taskprocessor listener
- */
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data);
+void *ast_taskprocessor_get_user_data(const struct ast_taskprocessor *tps);
 
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
@@ -179,7 +151,8 @@
  * \retval NULL Failure
  * \reval non-NULL success
  */
-struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name,
+	const struct ast_taskprocessor_callbacks *callbacks, void *user_data);
 
 /*!
  * \brief Sets the local data associated with a taskprocessor.
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 2629eab..8d00475 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -80,6 +80,11 @@
 	long tps_queue_high;
 	/*! \brief Taskprocessor queue */
 	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
+	/*! The callbacks the taskprocessor calls into to notify of state changes */
+	const struct ast_taskprocessor_callbacks *callbacks;
+	/*! Data private to the listener */
+	void *user_data;
+
 	struct ast_taskprocessor_listener *listener;
 	/*! Current thread executing the tasks */
 	pthread_t thread;
@@ -104,12 +109,8 @@
  * the module using the taskprocessor.
  */
 struct ast_taskprocessor_listener {
-	/*! The callbacks the taskprocessor calls into to notify of state changes */
-	const struct ast_taskprocessor_listener_callbacks *callbacks;
 	/*! The taskprocessor that the listener is listening to */
 	struct ast_taskprocessor *tps;
-	/*! Data private to the listener */
-	void *user_data;
 };
 
 #ifdef LOW_MEMORY
@@ -157,24 +158,21 @@
 	ast_free(pvt);
 }
 
-static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
+static void default_listener_pvt_dtor(struct ast_taskprocessor *tps)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+	default_listener_pvt_destroy(tps->user_data);
 
-	default_listener_pvt_destroy(pvt);
-
-	listener->user_data = NULL;
+	tps->user_data = NULL;
 }
 
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
  */
-static void *default_tps_processing_function(void *data)
+static void *default_tps_processing_function(void *obj)
 {
-	struct ast_taskprocessor_listener *listener = data;
-	struct ast_taskprocessor *tps = listener->tps;
-	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+	struct ast_taskprocessor *tps = obj;
+	struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
 	int sem_value;
 	int res;
 
@@ -194,25 +192,25 @@
 	ast_assert(res == 0 && sem_value == 0);
 
 	/* Free the shutdown reference (see default_listener_shutdown) */
-	ao2_t_ref(listener->tps, -1, "tps-shutdown");
+	ao2_t_ref(tps, -1, "tps-shutdown");
 
 	return NULL;
 }
 
-static int default_listener_start(struct ast_taskprocessor_listener *listener)
+static int default_listener_start(struct ast_taskprocessor *tps)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+	struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
 
-	if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
+	if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, tps)) {
 		return -1;
 	}
 
 	return 0;
 }
 
-static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void default_task_pushed(struct ast_taskprocessor *tps, int was_empty)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+	struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
 
 	if (ast_sem_post(&pvt->sem) != 0) {
 		ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
@@ -227,15 +225,15 @@
 	return 0;
 }
 
-static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
+static void default_listener_shutdown(struct ast_taskprocessor *tps)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+	struct default_taskprocessor_listener_pvt *pvt = tps->user_data;
 	int res;
 
 	/* Hold a reference during shutdown */
-	ao2_t_ref(listener->tps, +1, "tps-shutdown");
+	ao2_t_ref(tps, +1, "tps-shutdown");
 
-	if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
+	if (ast_taskprocessor_push(tps, default_listener_die, pvt)) {
 		/* This will cause the thread to exit early without completing tasks already
 		 * in the queue.  This is probably the least bad option in this situation. */
 		default_listener_die(pvt);
@@ -257,7 +255,7 @@
 	pvt->poll_thread = AST_PTHREADT_NULL;
 }
 
-static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+static const struct ast_taskprocessor_callbacks default_listener_callbacks = {
 	.start = default_listener_start,
 	.task_pushed = default_task_pushed,
 	.shutdown = default_listener_shutdown,
@@ -642,8 +640,10 @@
 
 	ast_free((char *) t->name);
 	t->name = NULL;
-	ao2_cleanup(t->listener);
-	t->listener = NULL;
+
+	if (t->callbacks->dtor) {
+		t->callbacks->dtor(t);
+	}
 }
 
 /* pop the front task and return it */
@@ -676,44 +676,9 @@
 	return tps->name;
 }
 
-static void listener_shutdown(struct ast_taskprocessor_listener *listener)
+void *ast_taskprocessor_get_user_data(const struct ast_taskprocessor *tps)
 {
-	listener->callbacks->shutdown(listener);
-	ao2_ref(listener->tps, -1);
-}
-
-static void taskprocessor_listener_dtor(void *obj)
-{
-	struct ast_taskprocessor_listener *listener = obj;
-
-	if (listener->callbacks->dtor) {
-		listener->callbacks->dtor(listener);
-	}
-}
-
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
-{
-	struct ast_taskprocessor_listener *listener;
-
-	listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
-	if (!listener) {
-		return NULL;
-	}
-	listener->callbacks = callbacks;
-	listener->user_data = user_data;
-
-	return listener;
-}
-
-struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
-{
-	ao2_ref(listener->tps, +1);
-	return listener->tps;
-}
-
-void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
-{
-	return listener->user_data;
+	return tps->user_data;
 }
 
 static void *default_listener_pvt_alloc(void)
@@ -733,16 +698,33 @@
 	return pvt;
 }
 
-static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
+static void tps_fake_shutdown(const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
+{
+	struct ast_taskprocessor tps = {
+		.user_data = user_data
+	};
+
+	if (callbacks->dtor) {
+		callbacks->dtor(&tps);
+	}
+}
+
+static struct ast_taskprocessor *__allocate_taskprocessor(const char *name,
+	const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
 {
 	struct ast_taskprocessor *p;
 
 	p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
 	if (!p) {
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
+		tps_fake_shutdown(callbacks, user_data);
+
 		return NULL;
 	}
 
+	p->callbacks = callbacks;
+	p->user_data = user_data;
+
 	/* Set default congestion water level alert triggers. */
 	p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
 	p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
@@ -753,22 +735,15 @@
 		return NULL;
 	}
 
-	ao2_ref(listener, +1);
-	p->listener = listener;
-
 	p->thread = AST_PTHREADT_NULL;
 
-	ao2_ref(p, +1);
-	listener->tps = p;
-
 	if (!(ao2_link(tps_singletons, p))) {
 		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
-		listener->tps = NULL;
-		ao2_ref(p, -2);
+		ao2_ref(p, -1);
 		return NULL;
 	}
 
-	if (p->listener->callbacks->start(p->listener)) {
+	if (p->callbacks->start(p)) {
 		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
 			p->name);
 		ast_taskprocessor_unreference(p);
@@ -784,7 +759,6 @@
 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
 {
 	struct ast_taskprocessor *p;
-	struct ast_taskprocessor_listener *listener;
 	struct default_taskprocessor_listener_pvt *pvt;
 
 	if (ast_strlen_zero(name)) {
@@ -799,36 +773,32 @@
 		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
 		return NULL;
 	}
-	/* Create a new taskprocessor. Start by creating a default listener */
+
 	pvt = default_listener_pvt_alloc();
 	if (!pvt) {
 		return NULL;
 	}
-	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
-	if (!listener) {
-		default_listener_pvt_destroy(pvt);
-		return NULL;
-	}
 
-	p = __allocate_taskprocessor(name, listener);
-
-	ao2_ref(listener, -1);
-	return p;
+	return __allocate_taskprocessor(name, &default_listener_callbacks, pvt);
 }
 
-struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name,
+	const struct ast_taskprocessor_callbacks *callbacks, void *user_data)
 {
 	struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
 
 	if (p) {
 		ast_taskprocessor_unreference(p);
+
+		tps_fake_shutdown(callbacks, user_data);
+
 		return NULL;
 	}
-	return __allocate_taskprocessor(name, listener);
+
+	return __allocate_taskprocessor(name, callbacks, user_data);
 }
 
-void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
-	void *local_data)
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data)
 {
 	SCOPED_AO2LOCK(lock, tps);
 	tps->local_data = local_data;
@@ -847,20 +817,23 @@
 	 */
 	ao2_lock(tps_singletons);
 
-	if (ao2_ref(tps, -1) > 3) {
+	if (ao2_ref(tps, -1) > 2) {
 		ao2_unlock(tps_singletons);
 		return NULL;
 	}
 
-	/* If we're down to 3 references, then those must be:
+	/* If we're down to 2 references, then those must be:
 	 * 1. The reference we just got rid of
 	 * 2. The container
-	 * 3. The listener
 	 */
-	ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
+
+	/* Steal the container reference */
+	ao2_find(tps_singletons, tps, OBJ_UNLINK | OBJ_POINTER | OBJ_NOLOCK);
 	ao2_unlock(tps_singletons);
 
-	listener_shutdown(tps->listener);
+	tps->callbacks->shutdown(tps);
+	ao2_ref(tps, -1);
+
 	return NULL;
 }
 
@@ -897,7 +870,7 @@
 	/* The currently executing task counts as still in queue */
 	was_empty = tps->executing ? 0 : previous_size == 0;
 	ao2_unlock(tps);
-	tps->listener->callbacks->task_pushed(tps->listener, was_empty);
+	tps->callbacks->task_pushed(tps, was_empty);
 	return 0;
 }
 
@@ -986,8 +959,8 @@
 	ao2_unlock(tps);
 
 	/* If we executed a task, check for the transition to empty */
-	if (size == 0 && tps->listener->callbacks->emptied) {
-		tps->listener->callbacks->emptied(tps->listener);
+	if (size == 0 && tps->callbacks->emptied) {
+		tps->callbacks->emptied(tps);
 	}
 	return size > 0;
 }
diff --git a/main/threadpool.c b/main/threadpool.c
index 7729930..dc618f9 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -386,6 +386,10 @@
 static void threadpool_destructor(void *obj)
 {
 	struct ast_threadpool *pool = obj;
+
+	ao2_cleanup(pool->active_threads);
+	ao2_cleanup(pool->idle_threads);
+	ao2_cleanup(pool->zombie_threads);
 	ao2_cleanup(pool->listener);
 }
 
@@ -438,7 +442,7 @@
 	return pool;
 }
 
-static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
+static int threadpool_tps_start(struct ast_taskprocessor *tps)
 {
 	return 0;
 }
@@ -604,10 +608,9 @@
  * \param listener The taskprocessor listener. The threadpool is the listener's private data
  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
  */
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
-		int was_empty)
+static void threadpool_tps_task_pushed(struct ast_taskprocessor *tps, int was_empty)
 {
-	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+	struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
 	struct task_pushed_data *tpd;
 	SCOPED_AO2LOCK(lock, pool);
 
@@ -648,9 +651,9 @@
  * the threadpool no longer contains any tasks.
  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  */
-static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
+static void threadpool_tps_emptied(struct ast_taskprocessor *tps)
 {
-	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+	struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
 	SCOPED_AO2LOCK(lock, pool);
 
 	if (pool->shutting_down) {
@@ -673,27 +676,31 @@
  * in outright destroying the worker threads here.
  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  */
-static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
+static void threadpool_tps_shutdown(struct ast_taskprocessor *tps)
 {
-	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
+	struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
 
 	if (pool->listener && pool->listener->callbacks->shutdown) {
 		pool->listener->callbacks->shutdown(pool->listener);
 	}
-	ao2_cleanup(pool->active_threads);
-	ao2_cleanup(pool->idle_threads);
-	ao2_cleanup(pool->zombie_threads);
+}
+
+static void threadpool_tps_dtor(struct ast_taskprocessor *tps)
+{
+	struct ast_threadpool *pool = ast_taskprocessor_get_user_data(tps);
+
 	ao2_cleanup(pool);
 }
 
 /*!
  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
  */
-static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
+static struct ast_taskprocessor_callbacks threadpool_tps_listener_callbacks = {
 	.start = threadpool_tps_start,
 	.task_pushed = threadpool_tps_task_pushed,
 	.emptied = threadpool_tps_emptied,
 	.shutdown = threadpool_tps_shutdown,
+	.dtor = threadpool_tps_dtor,
 };
 
 /*!
@@ -913,37 +920,26 @@
 		struct ast_threadpool_listener *listener,
 		const struct ast_threadpool_options *options)
 {
-	struct ast_taskprocessor *tps;
-	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
-	RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
-
-	pool = threadpool_alloc(name, options);
-	if (!pool) {
-		return NULL;
-	}
-
-	tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
-	if (!tps_listener) {
-		return NULL;
-	}
+	struct ast_threadpool *pool;
 
 	if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
 		ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
 		return NULL;
 	}
 
-	tps = ast_taskprocessor_create_with_listener(name, tps_listener);
-	if (!tps) {
+	pool = threadpool_alloc(name, options);
+	if (!pool) {
 		return NULL;
 	}
 
-	pool->tps = tps;
-	if (listener) {
-		ao2_ref(listener, +1);
-		pool->listener = listener;
+	pool->tps = ast_taskprocessor_create_with_listener(name, &threadpool_tps_listener_callbacks, pool);
+	if (!pool->tps) {
+		return NULL;
 	}
+
+	pool->listener = ao2_bump(listener);
 	ast_threadpool_set_size(pool, pool->options.initial_size);
-	ao2_ref(pool, +1);
+
 	return pool;
 }
 
@@ -1313,7 +1309,7 @@
 	struct ast_serializer_shutdown_group *shutdown_group;
 };
 
-static void serializer_dtor(void *obj)
+static void serializer_destructor(void *obj)
 {
 	struct serializer *ser = obj;
 
@@ -1328,7 +1324,7 @@
 {
 	struct serializer *ser;
 
-	ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+	ser = ao2_alloc_options(sizeof(*ser), serializer_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
 	if (!ser) {
 		return NULL;
 	}
@@ -1354,38 +1350,44 @@
 	return 0;
 }
 
-static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void serializer_task_pushed(struct ast_taskprocessor *tps, int was_empty)
 {
 	if (was_empty) {
-		struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
-		struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+		struct serializer *ser = ast_taskprocessor_get_user_data(tps);
 
-		if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
+		if (ast_threadpool_push(ser->pool, execute_tasks, ao2_bump(tps))) {
 			ast_taskprocessor_unreference(tps);
 		}
 	}
 }
 
-static int serializer_start(struct ast_taskprocessor_listener *listener)
+static int serializer_start(struct ast_taskprocessor *tps)
 {
 	/* No-op */
 	return 0;
 }
 
-static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+static void serializer_shutdown(struct ast_taskprocessor *tps)
 {
-	struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+	struct serializer *ser = ast_taskprocessor_get_user_data(tps);
 
 	if (ser->shutdown_group) {
 		serializer_shutdown_group_dec(ser->shutdown_group);
 	}
+}
+
+static void serializer_dtor(struct ast_taskprocessor *tps)
+{
+	struct serializer *ser = ast_taskprocessor_get_user_data(tps);
+
 	ao2_cleanup(ser);
 }
 
-static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+static struct ast_taskprocessor_callbacks serializer_tps_listener_callbacks = {
 	.task_pushed = serializer_task_pushed,
 	.start = serializer_start,
 	.shutdown = serializer_shutdown,
+	.dtor = serializer_dtor,
 };
 
 struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
@@ -1397,7 +1399,6 @@
 	struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
 {
 	struct serializer *ser;
-	struct ast_taskprocessor_listener *listener;
 	struct ast_taskprocessor *tps;
 
 	ser = serializer_create(pool, shutdown_group);
@@ -1405,21 +1406,11 @@
 		return NULL;
 	}
 
-	listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
-	if (!listener) {
-		ao2_ref(ser, -1);
-		return NULL;
-	}
-
-	tps = ast_taskprocessor_create_with_listener(name, listener);
-	if (!tps) {
-		/* ser ref transferred to listener but not cleaned without tps */
-		ao2_ref(ser, -1);
-	} else if (shutdown_group) {
+	tps = ast_taskprocessor_create_with_listener(name, &serializer_tps_listener_callbacks, ser);
+	if (tps && shutdown_group) {
 		serializer_shutdown_group_inc(shutdown_group);
 	}
 
-	ao2_ref(listener, -1);
 	return tps;
 }
 
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 6428746..eaee0dd 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -309,7 +309,7 @@
 /*!
  * \brief test taskprocessor listener's start callback
  */
-static int test_start(struct ast_taskprocessor_listener *listener)
+static int test_start(struct ast_taskprocessor *tps)
 {
 	return 0;
 }
@@ -319,9 +319,9 @@
  *
  * Adjusts private data's stats as indicated by the parameters.
  */
-static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+static void test_task_pushed(struct ast_taskprocessor *tps, int was_empty)
 {
-	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+	struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
 	++pvt->num_pushed;
 	if (was_empty) {
 		++pvt->num_was_empty;
@@ -331,22 +331,22 @@
 /*!
  * \brief test taskprocessor listener's emptied callback.
  */
-static void test_emptied(struct ast_taskprocessor_listener *listener)
+static void test_emptied(struct ast_taskprocessor *tps)
 {
-	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+	struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
 	++pvt->num_emptied;
 }
 
 /*!
  * \brief test taskprocessor listener's shutdown callback.
  */
-static void test_shutdown(struct ast_taskprocessor_listener *listener)
+static void test_shutdown(struct ast_taskprocessor *tps)
 {
-	struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
+	struct test_listener_pvt *pvt = ast_taskprocessor_get_user_data(tps);
 	pvt->shutdown = 1;
 }
 
-static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
+static const struct ast_taskprocessor_callbacks test_callbacks = {
 	.start = test_start,
 	.task_pushed = test_task_pushed,
 	.emptied = test_emptied,
@@ -409,7 +409,6 @@
 AST_TEST_DEFINE(taskprocessor_listener)
 {
 	struct ast_taskprocessor *tps = NULL;
-	struct ast_taskprocessor_listener *listener = NULL;
 	struct test_listener_pvt *pvt = NULL;
 	enum ast_test_result_state res = AST_TEST_PASS;
 
@@ -431,14 +430,7 @@
 		return AST_TEST_FAIL;
 	}
 
-	listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
-	if (!listener) {
-		ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
-		res = AST_TEST_FAIL;
-		goto test_exit;
-	}
-
-	tps = ast_taskprocessor_create_with_listener("test_listener", listener);
+	tps = ast_taskprocessor_create_with_listener("test_listener", &test_callbacks, pvt);
 	if (!tps) {
 		ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
 		res = AST_TEST_FAIL;
@@ -489,7 +481,6 @@
 	}
 
 test_exit:
-	ao2_cleanup(listener);
 	/* This is safe even if tps is NULL */
 	ast_taskprocessor_unreference(tps);
 	ast_free(pvt);

-- 
To view, visit https://gerrit.asterisk.org/10619
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I105ad352cad14638009fac7bf1faab8580f1458e
Gerrit-Change-Number: 10619
Gerrit-PatchSet: 1
Gerrit-Owner: Corey Farrell <git at cfware.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181112/21b3ab24/attachment-0001.html>


More information about the asterisk-code-review mailing list