[asterisk-commits] mmichelson: branch mmichelson/threadpool r377580 - in /team/mmichelson/thread...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Dec 10 00:13:11 CST 2012
Author: mmichelson
Date: Mon Dec 10 00:13:09 2012
New Revision: 377580
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=377580
Log:
Add threadpool options and accompanying test.
The only test added so far is an idle thread timeout
option. This will greatly aid threadpool users who wish
to maintain a threadpool by allowing for idle threads to
die out as necessary.
Test passes.
Modified:
team/mmichelson/threadpool/include/asterisk/threadpool.h
team/mmichelson/threadpool/main/threadpool.c
team/mmichelson/threadpool/tests/test_threadpool.c
Modified: team/mmichelson/threadpool/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/threadpool.h?view=diff&rev=377580&r1=377579&r2=377580
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/threadpool.h (original)
+++ team/mmichelson/threadpool/include/asterisk/threadpool.h Mon Dec 10 00:13:09 2012
@@ -82,6 +82,18 @@
void *private_data;
};
+struct ast_threadpool_options {
+#define AST_THREADPOOL_OPTIONS_VERSION 1
+ /*! Version of thradpool options in use */
+ int version;
+ /* !
+ * \brief Time limit in seconds for idle threads
+ *
+ * A time of 0 or less will mean an infinite timeout.
+ */
+ int idle_timeout;
+};
+
/*!
* \brief Allocate a threadpool listener
*
@@ -106,7 +118,8 @@
* \retval NULL Failed to create the threadpool
* \retval non-NULL The newly-created threadpool
*/
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size);
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+ int initial_size, const struct ast_threadpool_options *options);
/*!
* \brief Set the number of threads for the thread pool
Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=377580&r1=377579&r2=377580
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Mon Dec 10 00:13:09 2012
@@ -95,6 +95,8 @@
struct ast_taskprocessor *control_tps;
/*! True if the threadpool is in the processof shutting down */
int shutting_down;
+ /*! Threadpool-specific options */
+ struct ast_threadpool_options options;
};
/*!
@@ -266,6 +268,32 @@
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
}
+static int queued_idle_thread_dead(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_unlink(pair->pool->idle_threads, pair->worker);
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
+ if (!pair) {
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
+}
+
/*!
* \brief Execute a task in the threadpool
*
@@ -749,7 +777,13 @@
return listener;
}
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
+struct pool_options_pair {
+ struct ast_threadpool *pool;
+ struct ast_threadpool_options options;
+};
+
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+ int initial_size, const struct ast_threadpool_options *options)
{
struct ast_threadpool *pool;
struct ast_taskprocessor *tps;
@@ -771,6 +805,7 @@
pool->tps = tps;
ao2_ref(listener, +1);
pool->listener = listener;
+ pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
return pool;
}
@@ -814,6 +849,8 @@
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
+ /*! Options for this threadpool */
+ struct ast_threadpool_options options;
};
/*!
@@ -864,7 +901,7 @@
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
- ast_log(LOG_NOTICE, "Worker dying\n");
+ ast_debug(1, "Destroying worker thread\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
@@ -909,6 +946,7 @@
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
+ worker->options = pool->options;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
@@ -961,13 +999,28 @@
*/
static int worker_idle(struct worker_thread *worker)
{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + worker->options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000,
+ };
SCOPED_MUTEX(lock, &worker->lock);
if (worker->state != ALIVE) {
return 0;
}
threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
- ast_cond_wait(&worker->cond, lock);
+ if (worker->options.idle_timeout <= 0) {
+ ast_cond_wait(&worker->cond, lock);
+ } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ if (!worker->wake_up) {
+ ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
+ threadpool_idle_thread_dead(worker->pool, worker);
+ worker->state = DEAD;
}
worker->wake_up = 0;
return worker->state == ALIVE;
Modified: team/mmichelson/threadpool/tests/test_threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_threadpool.c?view=diff&rev=377580&r1=377579&r2=377580
==============================================================================
--- team/mmichelson/threadpool/tests/test_threadpool.c (original)
+++ team/mmichelson/threadpool/tests/test_threadpool.c Mon Dec 10 00:13:09 2012
@@ -257,6 +257,10 @@
struct ast_threadpool_listener *listener = NULL;
struct simple_task_data *std = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -275,7 +279,7 @@
return AST_TEST_FAIL;
}
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -306,6 +310,10 @@
struct ast_threadpool_listener *listener = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -325,7 +333,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -353,6 +361,10 @@
struct ast_threadpool_listener *listener = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -372,7 +384,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -391,6 +403,62 @@
WAIT_WHILE(tld, tld->num_idle > 2);
res = listener_check(test, listener, 0, 0, 0, 0, 2, 0);
+
+end:
+ if (pool) {
+ ast_threadpool_shutdown(pool);
+ }
+ ao2_cleanup(listener);
+ return res;
+}
+
+AST_TEST_DEFINE(threadpool_thread_timeout)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 5,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_thread_timeout";
+ info->category = "/main/threadpool/";
+ info->summary = "Test threadpool thread timeout";
+ info->description =
+ "Ensure that a thread with a five second timeout dies as expected.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks);
+ if (!listener) {
+ return AST_TEST_FAIL;
+ }
+ tld = listener->private_data;
+
+ pool = ast_threadpool_create(listener, 0, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ WAIT_WHILE(tld, tld->num_idle < 1);
+
+ res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ /* The thread should time out after 5 seconds */
+ WAIT_WHILE(tld, tld->num_idle > 0);
+
+ res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
end:
if (pool) {
@@ -407,6 +475,10 @@
struct simple_task_data *std = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -426,7 +498,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -476,6 +548,10 @@
struct simple_task_data *std = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -495,7 +571,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -545,6 +621,10 @@
struct simple_task_data *std3 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -564,7 +644,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -626,6 +706,10 @@
struct simple_task_data *std2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -647,7 +731,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -773,6 +857,10 @@
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -793,7 +881,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -851,6 +939,10 @@
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ };
switch (cmd) {
case TEST_INIT:
@@ -873,7 +965,7 @@
}
tld = listener->private_data;
- pool = ast_threadpool_create(listener, 0);
+ pool = ast_threadpool_create(listener, 0, &options);
if (!pool) {
goto end;
}
@@ -940,6 +1032,7 @@
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
+ ast_test_unregister(threadpool_thread_timeout);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -954,6 +1047,7 @@
ast_test_register(threadpool_push);
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
+ ast_test_register(threadpool_thread_timeout);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);
More information about the asterisk-commits
mailing list