[asterisk-commits] dlee: trunk r381326 - in /trunk: include/asterisk/ main/ tests/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Feb 12 15:46:02 CST 2013


Author: dlee
Date: Tue Feb 12 15:45:59 2013
New Revision: 381326

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=381326
Log:
Add a serializer interface to the threadpool

This patch adds the ability to create a serializer from a thread pool. A
serializer is a ast_taskprocessor with the same contract as a default
taskprocessor (tasks execute serially) except instead of executing out
of a dedicated thread, execution occurs in a thread from a
ast_threadpool. Think of it as a lightweight thread.

While it guarantees that each task will complete before executing the
next, there is no guarantee as to which thread from the pool individual
tasks will execute. This normally only matters if your code relys on
thread specific information, such as thread locals.

This patch also fixes a bug in how the 'was_empty' parameter is computed
for the push callback, and gets rid of the unused 'shutting_down' field.

Modified:
    trunk/include/asterisk/threadpool.h
    trunk/main/taskprocessor.c
    trunk/main/threadpool.c
    trunk/tests/test_taskprocessor.c
    trunk/tests/test_threadpool.c

Modified: trunk/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/threadpool.h?view=diff&rev=381326&r1=381325&r2=381326
==============================================================================
--- trunk/include/asterisk/threadpool.h (original)
+++ trunk/include/asterisk/threadpool.h Tue Feb 12 15:45:59 2013
@@ -177,4 +177,33 @@
  * \param pool The pool to shut down
  */
 void ast_threadpool_shutdown(struct ast_threadpool *pool);
+
+/*!
+ * \brief Serialized execution of tasks within a \ref ast_threadpool.
+ *
+ * \since 12.0.0
+ *
+ * A \ref ast_taskprocessor with the same contract as a default taskprocessor
+ * (tasks execute serially) except instead of executing out of a dedicated
+ * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it
+ * as a lightweight thread.
+ *
+ * While it guarantees that each task will complete before executing the next,
+ * there is no guarantee as to which thread from the \c pool individual tasks
+ * will execute. This normally only matters if your code relys on thread
+ * specific information, such as thread locals.
+ *
+ * Use ast_taskprocessor_unreference() to dispose of the returned \ref
+ * ast_taskprocessor.
+ *
+ * Only a single taskprocessor with a given name may exist. This function will fail
+ * if a taskprocessor with the given name already exists.
+ *
+ * \param name Name of the serializer. (must be unique)
+ * \param pool \ref ast_threadpool for execution.
+ * \return \ref ast_taskprocessor for enqueuing work.
+ * \return \c NULL on error.
+ */
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
+
 #endif /* ASTERISK_THREADPOOL_H */

Modified: trunk/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/taskprocessor.c?view=diff&rev=381326&r1=381325&r2=381326
==============================================================================
--- trunk/main/taskprocessor.c (original)
+++ trunk/main/taskprocessor.c Tue Feb 12 15:45:59 2013
@@ -75,8 +75,8 @@
 	/*! \brief Taskprocessor singleton list entry */
 	AST_LIST_ENTRY(ast_taskprocessor) list;
 	struct ast_taskprocessor_listener *listener;
-	/*! Indicates if the taskprocessor is in the process of shuting down */
-	unsigned int shutting_down:1;
+	/*! Indicates if the taskprocessor is currently executing a task */
+	unsigned int executing:1;
 };
 
 /*!
@@ -197,6 +197,8 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
+	ast_assert(!pvt->dead);
+
 	if (was_empty) {
 		default_tps_wake_up(pvt, 0);
 	}
@@ -447,10 +449,6 @@
 	struct tps_task *task;
 	SCOPED_AO2LOCK(lock, tps);
 
-	if (tps->shutting_down) {
-		return NULL;
-	}
-
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
 		tps->tps_queue_size--;
 	}
@@ -643,6 +641,7 @@
 {
 	struct tps_task *t;
 	int previous_size;
+	int was_empty;
 
 	if (!tps || !task_exe) {
 		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -655,8 +654,10 @@
 	ao2_lock(tps);
 	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
 	previous_size = tps->tps_queue_size++;
+	/* The currently executing task counts as still in queue */
+	was_empty = tps->executing ? 0 : previous_size == 0;
 	ao2_unlock(tps);
-	tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
+	tps->listener->callbacks->task_pushed(tps->listener, was_empty);
 	return 0;
 }
 
@@ -665,17 +666,26 @@
 	struct tps_task *t;
 	int size;
 
-	if (!(t = tps_taskprocessor_pop(tps))) {
-		return 0;
-	}
-
-	t->execute(t->datap);
-
-	tps_task_free(t);
-
 	ao2_lock(tps);
+	tps->executing = 1;
+	ao2_unlock(tps);
+
+	t = tps_taskprocessor_pop(tps);
+
+	if (t) {
+		t->execute(t->datap);
+		tps_task_free(t);
+	}
+
+	ao2_lock(tps);
+	/* We need to check size in the same critical section where we reset the
+	 * executing bit. Avoids a race condition where a task is pushed right
+	 * after we pop an empty stack.
+	 */
+	tps->executing = 0;
 	size = tps_taskprocessor_depth(tps);
-	if (tps->stats) {
+	/* If we executed a task, bump the stats */
+	if (t && tps->stats) {
 		tps->stats->_tasks_processed_count++;
 		if (size > tps->stats->max_qsize) {
 			tps->stats->max_qsize = size;
@@ -683,9 +693,9 @@
 	}
 	ao2_unlock(tps);
 
-	if (size == 0 && tps->listener->callbacks->emptied) {
+	/* If we executed a task, check for the transition to empty */
+	if (t && size == 0 && tps->listener->callbacks->emptied) {
 		tps->listener->callbacks->emptied(tps->listener);
-		return 0;
-	}
-	return 1;
-}
+	}
+	return size > 0;
+}

Modified: trunk/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/threadpool.c?view=diff&rev=381326&r1=381325&r2=381326
==============================================================================
--- trunk/main/threadpool.c (original)
+++ trunk/main/threadpool.c Tue Feb 12 15:45:59 2013
@@ -866,7 +866,7 @@
 	if (!pool) {
 		return NULL;
 	}
-	
+
 	tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
 	if (!tps_listener) {
 		return NULL;
@@ -1103,3 +1103,88 @@
 	ast_cond_signal(&worker->cond);
 }
 
+struct serializer {
+	struct ast_threadpool *pool;
+};
+
+static void serializer_dtor(void *obj)
+{
+	struct serializer *ser = obj;
+	ao2_cleanup(ser->pool);
+	ser->pool = NULL;
+}
+
+static struct serializer *serializer_create(struct ast_threadpool *pool)
+{
+	struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor);
+	if (!ser) {
+		return NULL;
+	}
+	ao2_ref(pool, +1);
+	ser->pool = pool;
+	return ser;
+}
+
+static int execute_tasks(void *data)
+{
+       struct ast_taskprocessor *tps = data;
+
+       while (ast_taskprocessor_execute(tps)) {
+	       /* No-op */
+       }
+
+       ast_taskprocessor_unreference(tps);
+       return 0;
+}
+
+static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
+       if (was_empty) {
+	       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+	       struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+	       ast_threadpool_push(ser->pool, execute_tasks, tps);
+       }
+};
+
+static int serializer_start(struct ast_taskprocessor_listener *listener)
+{
+       /* No-op */
+       return 0;
+}
+
+static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+{
+	struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+	ao2_cleanup(ser);
+}
+
+static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+       .task_pushed = serializer_task_pushed,
+       .start = serializer_start,
+       .shutdown = serializer_shutdown,
+};
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+	RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
+	struct ast_taskprocessor *tps = NULL;
+
+	ser = serializer_create(pool);
+	if (!ser) {
+		return NULL;
+	}
+
+	listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
+	if (!listener) {
+		return NULL;
+	}
+	ser = NULL; /* ownership transferred to listener */
+
+	tps = ast_taskprocessor_create_with_listener(name, listener);
+	if (!tps) {
+		return NULL;
+	}
+	listener = NULL; /* ownership transferred to tps */
+
+	return tps;
+}

Modified: trunk/tests/test_taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/trunk/tests/test_taskprocessor.c?view=diff&rev=381326&r1=381325&r2=381326
==============================================================================
--- trunk/tests/test_taskprocessor.c (original)
+++ trunk/tests/test_taskprocessor.c Tue Feb 12 15:45:59 2013
@@ -450,11 +450,193 @@
 	return res;
 }
 
+struct shutdown_data {
+	ast_cond_t in;
+	ast_cond_t out;
+	ast_mutex_t lock;
+	int task_complete;
+	int task_started;
+	int task_stop_waiting;
+};
+
+static void shutdown_data_dtor(void *data)
+{
+	struct shutdown_data *shutdown_data = data;
+	ast_mutex_destroy(&shutdown_data->lock);
+	ast_cond_destroy(&shutdown_data->in);
+	ast_cond_destroy(&shutdown_data->out);
+}
+
+static struct shutdown_data *shutdown_data_create(int dont_wait)
+{
+	RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
+
+	shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
+	if (!shutdown_data) {
+		return NULL;
+	}
+
+	ast_mutex_init(&shutdown_data->lock);
+	ast_cond_init(&shutdown_data->in, NULL);
+	ast_cond_init(&shutdown_data->out, NULL);
+	shutdown_data->task_stop_waiting = dont_wait;
+	ao2_ref(shutdown_data, +1);
+	return shutdown_data;
+}
+
+static int shutdown_task_exec(void *data)
+{
+	struct shutdown_data *shutdown_data = data;
+	SCOPED_MUTEX(lock, &shutdown_data->lock);
+	shutdown_data->task_started = 1;
+	ast_cond_signal(&shutdown_data->out);
+	while (!shutdown_data->task_stop_waiting) {
+		ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
+	}
+	shutdown_data->task_complete = 1;
+	ast_cond_signal(&shutdown_data->out);
+	return 0;
+}
+
+static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
+{
+	struct timeval start = ast_tvnow();
+	struct timespec end = {
+		.tv_sec = start.tv_sec + 5,
+		.tv_nsec = start.tv_usec * 1000
+	};
+	SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+	while (!shutdown_data->task_complete) {
+		if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+			break;
+		}
+	}
+
+	return shutdown_data->task_complete;
+}
+
+static int shutdown_has_completed(struct shutdown_data *shutdown_data)
+{
+	SCOPED_MUTEX(lock, &shutdown_data->lock);
+	return shutdown_data->task_complete;
+}
+
+static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
+{
+	struct timeval start = ast_tvnow();
+	struct timespec end = {
+		.tv_sec = start.tv_sec + 5,
+		.tv_nsec = start.tv_usec * 1000
+	};
+	SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+	while (!shutdown_data->task_started) {
+		if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+			break;
+		}
+	}
+
+	return shutdown_data->task_started;
+}
+
+static void shutdown_poke(struct shutdown_data *shutdown_data)
+{
+	SCOPED_MUTEX(lock, &shutdown_data->lock);
+	shutdown_data->task_stop_waiting = 1;
+	ast_cond_signal(&shutdown_data->in);
+}
+
+static void *tps_shutdown_thread(void *data)
+{
+	struct ast_taskprocessor *tps = data;
+	ast_taskprocessor_unreference(tps);
+	return NULL;
+}
+
+AST_TEST_DEFINE(taskprocessor_shutdown)
+{
+	RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+	RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
+	RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
+	int push_res;
+	int wait_res;
+	int pthread_res;
+	pthread_t shutdown_thread;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "taskprocessor_shutdown";
+		info->category = "/main/taskprocessor/";
+		info->summary = "Test of taskproccesor shutdown sequence";
+		info->description =
+			"Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
+	task1 = shutdown_data_create(0); /* task1 waits to be poked */
+	task2 = shutdown_data_create(1); /* task2 waits for nothing */
+
+	if (!tps || !task1 || !task2) {
+		ast_test_status_update(test, "Allocation error\n");
+		return AST_TEST_FAIL;
+	}
+
+	push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
+	if (push_res != 0) {
+		ast_test_status_update(test, "Could not push task1\n");
+		return AST_TEST_FAIL;
+	}
+
+	push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
+	if (push_res != 0) {
+		ast_test_status_update(test, "Could not push task2\n");
+		return AST_TEST_FAIL;
+	}
+
+	wait_res = shutdown_waitfor_start(task1);
+	if (!wait_res) {
+		ast_test_status_update(test, "Task1 didn't start\n");
+		return AST_TEST_FAIL;
+	}
+
+	pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
+	if (pthread_res != 0) {
+		ast_test_status_update(test, "Failed to create shutdown thread\n");
+		return AST_TEST_FAIL;
+	}
+	tps = NULL;
+
+	/* Wakeup task1; it should complete */
+	shutdown_poke(task1);
+	wait_res = shutdown_waitfor_completion(task1);
+	if (!wait_res) {
+		ast_test_status_update(test, "Task1 didn't complete\n");
+		return AST_TEST_FAIL;
+	}
+
+	/* Wait for shutdown to complete */
+	pthread_join(shutdown_thread, NULL);
+
+	/* Should have also also completed task2 */
+	wait_res = shutdown_has_completed(task2);
+	if (!wait_res) {
+		ast_test_status_update(test, "Task2 didn't finish\n");
+		return AST_TEST_FAIL;
+	}
+
+	return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
 	ast_test_unregister(default_taskprocessor);
 	ast_test_unregister(default_taskprocessor_load);
 	ast_test_unregister(taskprocessor_listener);
+	ast_test_unregister(taskprocessor_shutdown);
 	return 0;
 }
 
@@ -463,6 +645,7 @@
 	ast_test_register(default_taskprocessor);
 	ast_test_register(default_taskprocessor_load);
 	ast_test_register(taskprocessor_listener);
+	ast_test_register(taskprocessor_shutdown);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 

Modified: trunk/tests/test_threadpool.c
URL: http://svnview.digium.com/svn/asterisk/trunk/tests/test_threadpool.c?view=diff&rev=381326&r1=381325&r2=381326
==============================================================================
--- trunk/tests/test_threadpool.c (original)
+++ trunk/tests/test_threadpool.c Tue Feb 12 15:45:59 2013
@@ -31,12 +31,13 @@
 
 #include "asterisk.h"
 
+#include "asterisk/astobj2.h"
+#include "asterisk/lock.h"
+#include "asterisk/logger.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
 #include "asterisk/test.h"
 #include "asterisk/threadpool.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
-#include "asterisk/astobj2.h"
-#include "asterisk/logger.h"
 
 struct test_listener_data {
 	int num_active;
@@ -1124,11 +1125,12 @@
 }
 
 struct complex_task_data {
+	int task_started;
 	int task_executed;
 	int continue_task;
 	ast_mutex_t lock;
 	ast_cond_t stall_cond;
-	ast_cond_t done_cond;
+	ast_cond_t notify_cond;
 };
 
 static struct complex_task_data *complex_task_data_alloc(void)
@@ -1140,7 +1142,7 @@
 	}
 	ast_mutex_init(&ctd->lock);
 	ast_cond_init(&ctd->stall_cond, NULL);
-	ast_cond_init(&ctd->done_cond, NULL);
+	ast_cond_init(&ctd->notify_cond, NULL);
 	return ctd;
 }
 
@@ -1148,12 +1150,15 @@
 {
 	struct complex_task_data *ctd = data;
 	SCOPED_MUTEX(lock, &ctd->lock);
+	/* Notify that we started */
+	ctd->task_started = 1;
+	ast_cond_signal(&ctd->notify_cond);
 	while (!ctd->continue_task) {
 		ast_cond_wait(&ctd->stall_cond, lock);
 	}
 	/* We got poked. Finish up */
 	ctd->task_executed = 1;
-	ast_cond_signal(&ctd->done_cond);
+	ast_cond_signal(&ctd->notify_cond);
 	return 0;
 }
 
@@ -1164,18 +1169,54 @@
 	ast_cond_signal(&ctd->stall_cond);
 }
 
-static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
+static int wait_for_complex_start(struct complex_task_data *ctd)
 {
 	struct timeval start = ast_tvnow();
 	struct timespec end = {
 		.tv_sec = start.tv_sec + 5,
 		.tv_nsec = start.tv_usec * 1000
 	};
+	SCOPED_MUTEX(lock, &ctd->lock);
+
+	while (!ctd->task_started) {
+		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+			break;
+		}
+	}
+
+	return ctd->task_started;
+}
+
+static int has_complex_started(struct complex_task_data *ctd)
+{
+	struct timeval start = ast_tvnow();
+	struct timespec end = {
+		.tv_sec = start.tv_sec + 1,
+		.tv_nsec = start.tv_usec * 1000
+	};
+	SCOPED_MUTEX(lock, &ctd->lock);
+
+	while (!ctd->task_started) {
+		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+			break;
+		}
+	}
+
+	return ctd->task_started;
+}
+
+static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
+{
+	struct timeval start = ast_tvnow();
+	struct timespec end = {
+		.tv_sec = start.tv_sec + 5,
+		.tv_nsec = start.tv_usec * 1000
+	};
 	enum ast_test_result_state res = AST_TEST_PASS;
 	SCOPED_MUTEX(lock, &ctd->lock);
 
 	while (!ctd->task_executed) {
-		if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
+		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
 			break;
 		}
 	}
@@ -1391,6 +1432,177 @@
 	return res;
 }
 
+AST_TEST_DEFINE(threadpool_serializer)
+{
+	int started = 0;
+	int finished = 0;
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct ast_threadpool *pool = NULL;
+	struct ast_taskprocessor *uut = NULL;
+	struct complex_task_data *data1 = NULL;
+	struct complex_task_data *data2 = NULL;
+	struct complex_task_data *data3 = NULL;
+	struct ast_threadpool_options options = {
+		.version = AST_THREADPOOL_OPTIONS_VERSION,
+		.idle_timeout = 0,
+		.auto_increment = 0,
+		.initial_size = 2,
+		.max_size = 0,
+	};
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "threadpool_serializer";
+		info->category = "/main/threadpool/";
+		info->summary = "Test that serializers";
+		info->description =
+			"Ensures that tasks enqueued to a serialize execute in sequence.\n";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+	if (!pool) {
+		ast_test_status_update(test, "Could not create threadpool\n");
+		goto end;
+	}
+	uut = ast_threadpool_serializer("ser1", pool);
+	data1 = complex_task_data_alloc();
+	data2 = complex_task_data_alloc();
+	data3 = complex_task_data_alloc();
+	if (!uut || !data1 || !data2 || !data3) {
+		ast_test_status_update(test, "Allocation failed\n");
+		goto end;
+	}
+
+	/* This should start right away */
+	if (ast_taskprocessor_push(uut, complex_task, data1)) {
+		ast_test_status_update(test, "Failed to enqueue data1\n");
+		goto end;
+	}
+	started = wait_for_complex_start(data1);
+	if (!started) {
+		ast_test_status_update(test, "Failed to start data1\n");
+		goto end;
+	}
+
+	/* This should not start until data 1 is complete */
+	if (ast_taskprocessor_push(uut, complex_task, data2)) {
+		ast_test_status_update(test, "Failed to enqueue data2\n");
+		goto end;
+	}
+	started = has_complex_started(data2);
+	if (started) {
+		ast_test_status_update(test, "data2 started out of order\n");
+		goto end;
+	}
+
+	/* But the free thread in the pool can still run */
+	if (ast_threadpool_push(pool, complex_task, data3)) {
+		ast_test_status_update(test, "Failed to enqueue data3\n");
+	}
+	started = wait_for_complex_start(data3);
+	if (!started) {
+		ast_test_status_update(test, "Failed to start data3\n");
+		goto end;
+	}
+
+	/* Finishing data1 should allow data2 to start */
+	poke_worker(data1);
+	finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
+	if (!finished) {
+		ast_test_status_update(test, "data1 couldn't finish\n");
+		goto end;
+	}
+	started = wait_for_complex_start(data2);
+	if (!started) {
+		ast_test_status_update(test, "Failed to start data2\n");
+		goto end;
+	}
+
+	/* Finish up */
+	poke_worker(data2);
+	finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
+	if (!finished) {
+		ast_test_status_update(test, "data2 couldn't finish\n");
+		goto end;
+	}
+	poke_worker(data3);
+	finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
+	if (!finished) {
+		ast_test_status_update(test, "data3 couldn't finish\n");
+		goto end;
+	}
+
+	res = AST_TEST_PASS;
+
+end:
+	poke_worker(data1);
+	poke_worker(data2);
+	poke_worker(data3);
+	ast_taskprocessor_unreference(uut);
+	ast_threadpool_shutdown(pool);
+	ast_free(data1);
+	ast_free(data2);
+	ast_free(data3);
+	return res;
+}
+
+AST_TEST_DEFINE(threadpool_serializer_dupe)
+{
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct ast_threadpool *pool = NULL;
+	struct ast_taskprocessor *uut = NULL;
+	struct ast_taskprocessor *there_can_be_only_one = NULL;
+	struct ast_threadpool_options options = {
+		.version = AST_THREADPOOL_OPTIONS_VERSION,
+		.idle_timeout = 0,
+		.auto_increment = 0,
+		.initial_size = 2,
+		.max_size = 0,
+	};
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "threadpool_serializer_dupe";
+		info->category = "/main/threadpool/";
+		info->summary = "Test that serializers are uniquely named";
+		info->description =
+			"Creating two serializers with the same name should\n"
+			"result in error.\n";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+	if (!pool) {
+		ast_test_status_update(test, "Could not create threadpool\n");
+		goto end;
+	}
+
+	uut = ast_threadpool_serializer("highlander", pool);
+	if (!uut) {
+		ast_test_status_update(test, "Allocation failed\n");
+		goto end;
+	}
+
+	there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
+	if (there_can_be_only_one) {
+		ast_taskprocessor_unreference(there_can_be_only_one);
+		ast_test_status_update(test, "Duplicate name error\n");
+		goto end;
+	}
+
+	res = AST_TEST_PASS;
+
+end:
+	ast_taskprocessor_unreference(uut);
+	ast_threadpool_shutdown(pool);
+	return res;
+}
+
 static int unload_module(void)
 {
 	ast_test_unregister(threadpool_push);
@@ -1406,6 +1618,8 @@
 	ast_test_unregister(threadpool_reactivation);
 	ast_test_unregister(threadpool_task_distribution);
 	ast_test_unregister(threadpool_more_destruction);
+	ast_test_unregister(threadpool_serializer);
+	ast_test_unregister(threadpool_serializer_dupe);
 	return 0;
 }
 
@@ -1424,6 +1638,8 @@
 	ast_test_register(threadpool_reactivation);
 	ast_test_register(threadpool_task_distribution);
 	ast_test_register(threadpool_more_destruction);
+	ast_test_register(threadpool_serializer);
+	ast_test_register(threadpool_serializer_dupe);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 




More information about the asterisk-commits mailing list