[asterisk-commits] mmichelson: branch mmichelson/threadpool r376898 - /team/mmichelson/threadpoo...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Nov 29 18:19:53 CST 2012
Author: mmichelson
Date: Thu Nov 29 18:19:50 2012
New Revision: 376898
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=376898
Log:
Some more progress.
Still does not compile.
Modified:
team/mmichelson/threadpool/main/threadpool.c
Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=376898&r1=376897&r2=376898
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Thu Nov 29 18:19:50 2012
@@ -22,7 +22,16 @@
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
-struct ast_threadpool;
+#define THREAD_BUCKETS 89
+
+static int id_counter;
+
+struct ast_threadpool {
+ struct ast_threadpool_listener *threadpool_listener;
+ struct ao2_container *active_threads;
+ struct ao2_container *idle_threads;
+ struct ao2_container *zombie_threads;
+}
enum worker_state {
ALIVE,
@@ -31,14 +40,130 @@
};
struct worker_thread {
+ int id;
ast_cond_t cond;
ast_mutex_t lock;
pthread_t thread;
struct ast_threadpool *pool;
- AST_LIST_ENTRY(struct worker_thread) next;
+ enum worker_state state;
int wake_up;
- enum worker_state state;
};
+
+static int worker_thread_hash(const void *obj)
+{
+ struct worker_thread *worker= obj;
+
+ return worker->id;
+}
+
+static int worker_thread_cmp(void *obj, void *arg, int flags)
+{
+ struct worker_thread *worker1 = obj;
+ struct worker_thread *worker2 = arg;
+
+ return worker1->id == worker2->id ? CMP_MATCH : 0;
+}
+
+static worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
+{
+ struct worker_thread *worker = ao2_alloc(1, sizeof(*worker));
+ if (!worker) {
+ /* XXX Dangit! */
+ return NULL;
+ }
+ worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
+ ast_mutex_init(&worker->lock);
+ ast_cond_init(&worker->cond, NULL);
+ worker->pool = pool;
+ worker->thread = AST_PTHREADT_NULL;
+ worker->state = ALIVE;
+ if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) {
+ /* XXX Poop! */
+ ao2_ref(worker, -1);
+ return NULL;
+ }
+ return worker;
+}
+
+static void threadpool_send_state_changed(struct ast_threadpool *pool)
+{
+ int active_size = ao2_container_count(pool->active_threads);
+ int idle_size = ao2_container_count(pool->idle_threads);
+ int zombie_size = ao2_container_count(pool->zombie_threads);
+
+ pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
+}
+
+struct thread_worker_pair {
+ struct ast_threadpool *pool;
+ struct worker_thread *worker;
+};
+
+static void thread_worker_pair_destructor(void *obj)
+{
+ struct thread_worker_pair *pair = obj;
+ ao2_ref(pair->pool, -1);
+ ao2_ref(pair->worker, -1);
+}
+
+struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
+ if (!pair) {
+ /*XXX Crap */
+ return NULL;
+ }
+ pair->pool = ao2_ref(pool);
+ pair->worker = ao2_ref(worker);
+ return pair;
+}
+
+static int queued_active_thread_idle(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_link(pair->pool->idle_threads, pair->worker);
+ ao2_unlink(pair->pool->active_threads, pair->worker);
+
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+static void threadpool_active_thread_idle(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
+ if (!pair) {
+ /*XXX Crap */
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair));
+}
+
+static int queued_zombie_thread_dead(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_unlink(pair->pool->zombie_threads, pair->worker);
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
+ if (!pair) {
+ /* XXX Crap */
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair));
+}
static int worker_idle(struct worker_thread *worker)
{
@@ -78,19 +203,31 @@
return 0;
}
-struct ast_threadpool {
- struct ast_threadpool_listener *threadpool_listener;
- int active_threads;
- int idle_threads;
- int zombie_threads;
-}
static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
{
- RAII_VAR(ast_threadpool *, threadpool,
- ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup);
-
- return threadpool;
+ RAII_VAR(ast_threadpool *, pool,
+ ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup);
+
+ pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT);
+ if (!pool->control_tps) {
+ return NULL;
+ }
+ pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+ if (!pool->active_threads) {
+ return NULL;
+ }
+ pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+ if (!pool->idle_threads) {
+ return NULL;
+ }
+ pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+ if (!pool->zombie_thread) {
+ return NULL;
+ }
+
+ ao2_ref(pool, +1);
+ return pool;
}
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
@@ -140,8 +277,116 @@
return ast_taskprocessor_create_with_listener(tps_listener);
}
+static void grow(struct ast_threadpool *pool, int delta)
+{
+ int i;
+ for (i = 0; i < delta; ++i) {
+ struct worker_thread *worker = worker_thread_alloc(pool);
+ if (!worker) {
+ /* XXX Abandon */
+ return;
+ }
+ ao2_link(pool->active_threads, worker);
+ }
+}
+
+static int kill_threads(void *obj, void *arg, int flags)
+{
+ int *num_to_kill = arg;
+
+ if ((*num_to_kill)-- > 0) {
+ return CMP_MATCH;
+ } else {
+ return CMP_STOP;
+ }
+}
+
+static int zombify_threads(void *obj, void *arg, void *data, int flags)
+{
+ struct worker_thread *worker = obj;
+ struct ast_threadpool *pool = arg;
+ int *num_to_zombify = data;
+
+ if ((*num_to_zombify)-- > 0) {
+ ao2_link(pool->zombie_threads, worker);
+ return CMP_MATCH;
+ } else {
+ return CMP_STOP;
+ }
+}
+
+static void shrink(struct ast_threadpool *pool, int delta)
+{
+ /*
+ * Preference is to kill idle threads, but
+ * we'll move on to deactivating active threads
+ * if we have to
+ */
+ int idle_threads = ao2_container_count(pool->idle_threads);
+ int idle_threads_to_kill = MIN(delta, idle_threads);
+ int active_threads_to_zombify = delta - idle_threads_to_kill;
+ int i = 0;
+
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
+ kill_threads, &idle_threads_to_kill);
+
+ ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
+ zombify_threads, pool, &active_threads_to_zombify);
+}
+
+struct set_size_data {
+ struct threadpool *pool;
+ int size;
+};
+
+void set_size_data_destroy(void *obj)
+{
+ struct set_size_data *ssd = obj;
+ ao2_ref(ssd->pool, -1);
+}
+
+static int queued_set_size(void *data)
+{
+ struct set_size_data *ssd = data;
+ struct ast_threadpool *pool = ssd->pool;
+ int num_threads = ssd->size;
+
+ /* We don't count zombie threads as being "live when potentially resizing */
+ int current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
+
+ if (current_size = num_threads) {
+ return 0;
+ }
+
+ if (current_size < num_threads) {
+ grow(pool, num_threads - current_size);
+ } else {
+ shrink(pool, current_size - num_threads);
+ }
+
+ threadpool_send_state_changed(pool);
+ ao2_ref(set_size_data, -1);
+}
+
void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
{
+ struct set_size_data *ssd;
+ if (size < 0) {
+ ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size);
+ return;
+ }
+
+ ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
+ if (!ssd) {
+ /* XXX Crap */
+ return;
+ }
+
+ ssd->pool = ao2_ref(pool);
+ ssd->size = size;
+
+ ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
}
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
More information about the asterisk-commits
mailing list