[asterisk-commits] mmichelson: branch mmichelson/threadpool r379120 - in /team/mmichelson/thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Jan 15 12:40:40 CST 2013


Author: mmichelson
Date: Tue Jan 15 12:40:36 2013
New Revision: 379120

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=379120
Log:
Remove alloc and destroy callbacks from the taskprocessor.

Now user data is allocated by the creator of the taskprocessor
listener and that user data is passed into ast_taskprocessor_listener_alloc().
Similarly, freeing of the user data is left up to the user himself. He can
free the data when the taskprocessor shuts down, or he can choose to hold
onto it if it makes sense to do so.

This, unsurprisingly, makes threadpool allocation a LOT cleaner now.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/main/taskprocessor.c
    team/mmichelson/threadpool/main/threadpool.c
    team/mmichelson/threadpool/tests/test_taskprocessor.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=379120&r1=379119&r2=379120
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Tue Jan 15 12:40:36 2013
@@ -75,17 +75,6 @@
 
 struct ast_taskprocessor_listener_callbacks {
 	/*!
-	 * \brief Allocate the listener's private data
-	 *
-	 * This is called during taskprocesor creation.
-	 * It is not necessary to assign the private data to the listener.
-	 *
-	 * \param listener The listener to which the private data belongs
-	 * \retval NULL Error while attempting to initialize private data
-	 * \retval non-NULL Allocated private data
-	 */
-	void *(*alloc)(struct ast_taskprocessor_listener *listener);
-	/*!
 	 * \brief The taskprocessor has started completely
 	 *
 	 * This indicates that the taskprocessor is fully set up and the listener
@@ -111,7 +100,8 @@
 	 * \brief Indicates the taskprocessor wishes to die.
 	 *
 	 * All operations on the task processor must to be stopped in
-	 * this callback.
+	 * this callback. This is an opportune time to free the listener's
+	 * user data if it is not going to be used anywhere else.
 	 *
 	 * After this callback returns, it is NOT safe to operate on the
 	 * listener's reference to the taskprocessor.
@@ -119,15 +109,6 @@
 	 * \param listener The listener
 	 */
 	void (*shutdown)(struct ast_taskprocessor_listener *listener);
-	/*!
-	 * \brief Destroy the listener's private data
-	 *
-	 * It is required that you free the private data in this callback
-	 * in addition to the private data's individual fields.
-	 *
-	 * \param private_data The listener's private data
-	 */
-	void (*destroy)(void *private_data);
 };
 
 /*!
@@ -146,7 +127,7 @@
 	/*! The taskprocessor that the listener is listening to */
 	struct ast_taskprocessor *tps;
 	/*! Data private to the listener */
-	void *private_data;
+	void *user_data;
 };
 
 /*!
@@ -158,10 +139,11 @@
  * callbacks.
  *
  * \param callbacks The callbacks to assign to the listener
+ * \param user_data The user data for the listener
  * \retval NULL Failure
  * \retval non-NULL The newly allocated taskprocessor listener
  */
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks);
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data);
 
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=379120&r1=379119&r2=379120
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Tue Jan 15 12:40:36 2013
@@ -151,7 +151,7 @@
 {
 	struct ast_taskprocessor_listener *listener = data;
 	struct ast_taskprocessor *tps = listener->tps;
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 	int dead = 0;
 
 	while (!dead) {
@@ -162,23 +162,9 @@
 	return NULL;
 }
 
-static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
-{
-	struct default_taskprocessor_listener_pvt *pvt;
-
-	pvt = ast_calloc(1, sizeof(*pvt));
-	if (!pvt) {
-		return NULL;
-	}
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
-	pvt->poll_thread = AST_PTHREADT_NULL;
-	return pvt;
-}
-
 static int default_listener_start(struct ast_taskprocessor_listener *listener)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
 	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
 		return -1;
@@ -189,41 +175,33 @@
 
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
 	if (was_empty) {
 		default_tps_wake_up(pvt, 0);
 	}
 }
 
-static void default_emptied(struct ast_taskprocessor_listener *listener)
-{
-	/* No-op */
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+{
+	ast_mutex_destroy(&pvt->lock);
+	ast_cond_destroy(&pvt->cond);
+	ast_free(pvt);
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 	default_tps_wake_up(pvt, 1);
 	pthread_join(pvt->poll_thread, NULL);
 	pvt->poll_thread = AST_PTHREADT_NULL;
-}
-
-static void default_listener_destroy(void *obj)
-{
-	struct default_taskprocessor_listener_pvt *pvt = obj;
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
-	ast_free(pvt);
+	default_listener_pvt_destroy(pvt);
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
-	.alloc = default_listener_alloc,
 	.start = default_listener_start,
 	.task_pushed = default_task_pushed,
-	.emptied = default_emptied,
 	.shutdown = default_listener_shutdown,
-	.destroy = default_listener_destroy,
 };
 
 /*!
@@ -474,31 +452,39 @@
 	return tps->name;
 }
 
-static void listener_destroy(void *obj)
-{
-	struct ast_taskprocessor_listener *listener = obj;
-
-	listener->callbacks->destroy(listener->private_data);
-}
-
 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
 	listener->callbacks->shutdown(listener);
 	ao2_ref(listener->tps, -1);
 }
 
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 {
 	RAII_VAR(struct ast_taskprocessor_listener *, listener,
-			ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+			ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
 
 	if (!listener) {
 		return NULL;
 	}
 	listener->callbacks = callbacks;
+	listener->user_data = user_data;
 
 	ao2_ref(listener, +1);
 	return listener;
+}
+
+static void *default_listener_pvt_alloc(void)
+{
+	struct default_taskprocessor_listener_pvt *pvt;
+
+	pvt = ast_calloc(1, sizeof(*pvt));
+	if (!pvt) {
+		return NULL;
+	}
+	ast_cond_init(&pvt->cond, NULL);
+	ast_mutex_init(&pvt->lock);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	return pvt;
 }
 
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
@@ -508,6 +494,7 @@
 {
 	struct ast_taskprocessor *p;
 	struct ast_taskprocessor_listener *listener;
+	struct default_taskprocessor_listener_pvt *pvt;
 
 	if (ast_strlen_zero(name)) {
 		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
@@ -522,13 +509,19 @@
 		return NULL;
 	}
 	/* Create a new taskprocessor. Start by creating a default listener */
-	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks);
+	pvt = default_listener_pvt_alloc();
+	if (!pvt) {
+		return NULL;
+	}
+	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
 	if (!listener) {
+		default_listener_pvt_destroy(pvt);
 		return NULL;
 	}
 
 	p = ast_taskprocessor_create_with_listener(name, listener);
 	if (!p) {
+		default_listener_pvt_destroy(pvt);
 		ao2_ref(listener, -1);
 		return NULL;
 	}
@@ -564,14 +557,6 @@
 
 	ao2_ref(p, +1);
 	listener->tps = p;
-
-	/* Allocation of private data must come after setting taskprocessor parameters
-	 * so that listeners who rely on taskprocessor data will have access to it.
-	 */
-	listener->private_data = listener->callbacks->alloc(listener);
-	if (!listener->private_data) {
-		return NULL;
-	}
 
 	if (!(ao2_link(tps_singletons, p))) {
 		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
@@ -656,7 +641,7 @@
 	}
 	ao2_unlock(tps);
 
-	if (size == 0) {
+	if (size == 0 && tps->listener->callbacks->emptied) {
 		tps->listener->callbacks->emptied(tps->listener);
 		return 0;
 	}

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=379120&r1=379119&r2=379120
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Tue Jan 15 12:40:36 2013
@@ -365,23 +365,25 @@
  * is because the threadpool exists as the private data on a taskprocessor
  * listener.
  *
- * \param listener The taskprocessor listener where the threadpool will live.
+ * \param name The name of the threadpool.
+ * \param options The options the threadpool uses.
  * \retval NULL Could not initialize threadpool properly
  * \retval non-NULL The newly-allocated threadpool
  */
-static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
+static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
 {
 	RAII_VAR(struct ast_threadpool *, pool,
 			ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
-	struct ast_str *name = ast_str_create(64);
-
-	if (!name) {
-		return NULL;
-	}
-
-	ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
-
-	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
+	struct ast_str *control_tps_name = ast_str_create(64);
+
+	if (!control_tps_name) {
+		return NULL;
+	}
+
+	ast_str_set(&control_tps_name, 0, "%s-control", name);
+
+	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
+	ast_free(control_tps_name);
 	if (!pool->control_tps) {
 		return NULL;
 	}
@@ -397,6 +399,7 @@
 	if (!pool->zombie_threads) {
 		return NULL;
 	}
+	pool->options = *options;
 
 	ao2_ref(pool, +1);
 	return pool;
@@ -545,7 +548,7 @@
 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
 		int was_empty)
 {
-	struct ast_threadpool *pool = listener->private_data;
+	struct ast_threadpool *pool = listener->user_data;
 	struct task_pushed_data *tpd;
 	SCOPED_AO2LOCK(lock, pool);
 
@@ -585,7 +588,7 @@
  */
 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
 {
-	struct ast_threadpool *pool = listener->private_data;
+	struct ast_threadpool *pool = listener->user_data;
 	SCOPED_AO2LOCK(lock, pool);
 
 	if (pool->shutting_down) {
@@ -608,26 +611,11 @@
  */
 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
 {
-	struct ast_threadpool *pool = listener->private_data;
+	struct ast_threadpool *pool = listener->user_data;
 
 	ao2_cleanup(pool->active_threads);
 	ao2_cleanup(pool->idle_threads);
 	ao2_cleanup(pool->zombie_threads);
-}
-
-/*!
- * \brief Taskprocessor listener destroy callback
- *
- * Since the threadpool is an ao2 object, all that is necessary is to
- * decrease the refcount. Since the control taskprocessor should already
- * be destroyed by this point, this should be the final reference to the
- * threadpool.
- *
- * \param private_data The threadpool to destroy
- */
-static void threadpool_destroy(void *private_data)
-{
-	struct ast_threadpool *pool = private_data;
 	ao2_cleanup(pool);
 }
 
@@ -635,12 +623,10 @@
  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
  */
 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
-	.alloc = threadpool_alloc,
 	.start = threadpool_tps_start,
 	.task_pushed = threadpool_tps_task_pushed,
 	.emptied = threadpool_tps_emptied,
 	.shutdown = threadpool_tps_shutdown,
-	.destroy = threadpool_destroy,
 };
 
 /*!
@@ -854,12 +840,15 @@
 		struct ast_threadpool_listener *listener,
 		int initial_size, const struct ast_threadpool_options *options)
 {
-	struct ast_threadpool *pool;
 	struct ast_taskprocessor *tps;
-	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
-			ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
-			ao2_cleanup);
-
+	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
+
+	if (!pool) {
+		return NULL;
+	}
+	
+	tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
 	if (!tps_listener) {
 		return NULL;
 	}
@@ -870,19 +859,17 @@
 	}
 
 	tps = ast_taskprocessor_create_with_listener(name, tps_listener);
-
 	if (!tps) {
 		return NULL;
 	}
 
-	pool = tps_listener->private_data;
 	pool->tps = tps;
 	if (listener) {
 		ao2_ref(listener, +1);
 		pool->listener = listener;
 	}
-	pool->options = *options;
 	ast_threadpool_set_size(pool, initial_size);
+	ao2_ref(pool, +1);
 	return pool;
 }
 

Modified: team/mmichelson/threadpool/tests/test_taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_taskprocessor.c?view=diff&rev=379120&r1=379119&r2=379120
==============================================================================
--- team/mmichelson/threadpool/tests/test_taskprocessor.c (original)
+++ team/mmichelson/threadpool/tests/test_taskprocessor.c Tue Jan 15 12:40:36 2013
@@ -260,7 +260,7 @@
 /*!
  * \brief test taskprocessor listener's alloc callback
  */
-static void *test_alloc(struct ast_taskprocessor_listener *listener)
+static void *test_listener_pvt_alloc(void)
 {
 	struct test_listener_pvt *pvt;
 
@@ -283,7 +283,7 @@
  */
 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
-	struct test_listener_pvt *pvt = listener->private_data;
+	struct test_listener_pvt *pvt = listener->user_data;
 	++pvt->num_pushed;
 	if (was_empty) {
 		++pvt->num_was_empty;
@@ -295,7 +295,7 @@
  */
 static void test_emptied(struct ast_taskprocessor_listener *listener)
 {
-	struct test_listener_pvt *pvt = listener->private_data;
+	struct test_listener_pvt *pvt = listener->user_data;
 	++pvt->num_emptied;
 }
 
@@ -304,26 +304,15 @@
  */
 static void test_shutdown(struct ast_taskprocessor_listener *listener)
 {
-	struct test_listener_pvt *pvt = listener->private_data;
+	struct test_listener_pvt *pvt = listener->user_data;
 	pvt->shutdown = 1;
 }
 
-/*!
- * \brief test taskprocessor listener's destroy callback.
- */
-static void test_destroy(void *private_data)
-{
-	struct test_listener_pvt *pvt = private_data;
-	ast_free(pvt);
-}
-
 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
-	.alloc = test_alloc,
 	.start = test_start,
 	.task_pushed = test_task_pushed,
 	.emptied = test_emptied,
 	.shutdown = test_shutdown,
-	.destroy = test_destroy,
 };
 
 /*!
@@ -381,9 +370,9 @@
  */
 AST_TEST_DEFINE(taskprocessor_listener)
 {
-	struct ast_taskprocessor *tps;
-	struct ast_taskprocessor_listener *listener;
-	struct test_listener_pvt *pvt;
+	struct ast_taskprocessor *tps = NULL;
+	struct ast_taskprocessor_listener *listener = NULL;
+	struct test_listener_pvt *pvt = NULL;
 	enum ast_test_result_state res = AST_TEST_PASS;
 
 	switch (cmd) {
@@ -398,10 +387,17 @@
 		break;
 	}
 
-	listener = ast_taskprocessor_listener_alloc(&test_callbacks);
+	pvt = test_listener_pvt_alloc();
+	if (!pvt) {
+		ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
+		return AST_TEST_FAIL;
+	}
+
+	listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
 	if (!listener) {
 		ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
-		return AST_TEST_FAIL;
+		res = AST_TEST_FAIL;
+		goto test_exit;
 	}
 
 	tps = ast_taskprocessor_create_with_listener("test_listener", listener);
@@ -411,8 +407,6 @@
 		goto test_exit;
 	}
 
-	pvt = listener->private_data;
-
 	ast_taskprocessor_push(tps, listener_test_task, NULL);
 
 	if (check_stats(test, pvt, 1, 0, 1) < 0) {
@@ -449,9 +443,10 @@
 	}
 
 test_exit:
-	ao2_ref(listener, -1);
+	ao2_cleanup(listener);
 	/* This is safe even if tps is NULL */
 	ast_taskprocessor_unreference(tps);
+	ast_free(pvt);
 	return res;
 }
 




More information about the asterisk-commits mailing list