[asterisk-commits] branch file/iax2-threading r16801 - in
/team/file/iax2-threading: channels/ c...
asterisk-commits at lists.digium.com
asterisk-commits at lists.digium.com
Fri Mar 31 13:24:26 MST 2006
Author: file
Date: Fri Mar 31 14:24:24 2006
New Revision: 16801
URL: http://svn.digium.com/view/asterisk?rev=16801&view=rev
Log:
Convert IAX2 multithreaded stuff to linked lists, and add dynamic threads. These are threads that will only be created under heavy load (when you run out of pool threads). Once load dies down, they then disappear until needed again.
Modified:
team/file/iax2-threading/channels/chan_iax2.c
team/file/iax2-threading/configs/iax.conf.sample
Modified: team/file/iax2-threading/channels/chan_iax2.c
URL: http://svn.digium.com/view/asterisk/team/file/iax2-threading/channels/chan_iax2.c?rev=16801&r1=16800&r2=16801&view=diff
==============================================================================
--- team/file/iax2-threading/channels/chan_iax2.c (original)
+++ team/file/iax2-threading/channels/chan_iax2.c Fri Mar 31 14:24:24 2006
@@ -92,6 +92,7 @@
#include "asterisk/devicestate.h"
#include "asterisk/netsock.h"
#include "asterisk/stringfields.h"
+#include "asterisk/linkedlists.h"
#include "iax2.h"
#include "iax2-parser.h"
@@ -134,6 +135,7 @@
#define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
#define DEFAULT_THREAD_COUNT 10
+#define DEFAULT_MAX_THREAD_COUNT 100
#define DEFAULT_RETRY_TIME 1000
#define MEMORY_SIZE 100
#define DEFAULT_DROP 3
@@ -444,6 +446,8 @@
static int min_jitter_buffer = MIN_JITTER_BUFFER;
static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+static int iaxmaxthreadcount = DEFAULT_MAX_THREAD_COUNT;
+static int iaxdynamicthreadcount = 0;
struct iax_rr {
int jitter;
@@ -681,8 +685,12 @@
#define IAX_IOSTATE_PROCESSING 2
#define IAX_IOSTATE_SCHEDREADY 3
+#define IAX_TYPE_POOL 1
+#define IAX_TYPE_DYNAMIC 2
+
struct iax2_thread {
- ASTOBJ_COMPONENTS(struct iax2_thread);
+ AST_LIST_ENTRY(iax2_thread) list;
+ int type;
int iostate;
#ifdef SCHED_MULTITHREADED
void (*schedfunc)(void *);
@@ -704,11 +712,12 @@
ast_cond_t cond;
};
-struct iax2_thread_list {
- ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
-};
-
-static struct iax2_thread_list idlelist, activelist;
+/* Thread lists */
+static AST_LIST_HEAD_STATIC(idle_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(active_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(dynamic_list, iax2_thread);
+
+static void *iax2_process_thread(void *data);
static void signal_condition(ast_mutex_t *lock, ast_cond_t *cond)
{
@@ -830,19 +839,59 @@
static struct iax2_thread *find_idle_thread(void)
{
- struct iax2_thread *thread;
- thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+ struct iax2_thread *thread = NULL;
+
+ /* Find free idle thread in the list, get a pointer to it, and remove it from the list */
+ AST_LIST_LOCK(&idle_list);
+ thread = AST_LIST_FIRST(&idle_list);
+ if (thread != NULL) {
+ AST_LIST_REMOVE(&idle_list, thread, list);
+ thread->list.next = NULL;
+ }
+ AST_LIST_UNLOCK(&idle_list);
+
+ /* If no idle thread is available from the regular list, try dynamic */
+ if (thread == NULL) {
+ AST_LIST_LOCK(&dynamic_list);
+ thread = AST_LIST_FIRST(&dynamic_list);
+ if (thread != NULL) {
+ AST_LIST_REMOVE(&dynamic_list, thread, list);
+ thread->list.next = NULL;
+ }
+ /* Make sure we absolutely have a thread... if not, try to make one if allowed */
+ if (thread == NULL && iaxmaxthreadcount > iaxdynamicthreadcount) {
+ /* We need to MAKE a thread! */
+ thread = ast_calloc(1, sizeof(*thread));
+ if (thread != NULL) {
+ thread->threadnum = iaxdynamicthreadcount;
+ thread->type = IAX_TYPE_DYNAMIC;
+ ast_mutex_init(&thread->lock);
+ ast_cond_init(&thread->cond, NULL);
+ if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+ free(thread);
+ thread = NULL;
+ } else {
+ /* All went well and the thread is up, so increment our count */
+ iaxdynamicthreadcount++;
+ }
+ }
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+ }
+
return thread;
}
#ifdef SCHED_MULTITHREADED
static int __schedule_action(void (*func)(void *data), void *data, const char *funcname)
{
- struct iax2_thread *thread;
+ struct iax2_thread *thread = NULL;
static time_t lasterror;
static time_t t;
+
thread = find_idle_thread();
- if (thread) {
+
+ if (thread != NULL) {
thread->schedfunc = func;
thread->scheddata = data;
thread->iostate = IAX_IOSTATE_SCHEDREADY;
@@ -856,6 +905,7 @@
if (t != lasterror)
ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
lasterror = t;
+
return -1;
}
#define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
@@ -4445,42 +4495,60 @@
static int iax2_show_threads(int fd, int argc, char *argv[])
{
+ struct iax2_thread *thread = NULL;
time_t t;
- int threadcount = 0;
+ int threadcount = 0, dynamiccount = 0;
+ char type;
+
if (argc != 3)
return RESULT_SHOWUSAGE;
ast_cli(fd, "IAX2 Thread Information\n");
time(&t);
ast_cli(fd, "Idle Threads:\n");
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_TRAVERSE(&idle_list, thread, list) {
#ifdef DEBUG_SCHED_MULTITHREAD
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
threadcount++;
- });
+ }
+ AST_LIST_UNLOCK(&idle_list);
+ ast_cli(fd, "Active Threads:\n");
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_TRAVERSE(&active_list, thread, list) {
+ if (thread->type == IAX_TYPE_DYNAMIC)
+ type = 'D';
+ else
+ type = 'P';
+#ifdef DEBUG_SCHED_MULTITHREAD
+ ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
#else
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+ ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d\n",
+ type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
threadcount++;
- });
+ }
+ AST_LIST_UNLOCK(&active_list);
+ ast_cli(fd, "Dynamic Threads:\n");
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+#ifdef DEBUG_SCHED_MULTITHREAD
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+ ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+ thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
#endif
- ast_cli(fd, "Active Threads:\n");
-#ifdef DEBUG_SCHED_MULTITHREAD
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
- threadcount++;
- });
-#else
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n",
- iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
- threadcount++;
- });
-#endif
- ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+ dynamiccount++;
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+ ast_cli(fd, "%d of %d threads accounted for with %d dynamic threads\n", threadcount, iaxthreadcount, dynamiccount);
return RESULT_SUCCESS;
}
@@ -6556,11 +6624,15 @@
if (errno != ECONNREFUSED)
ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
handle_error();
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
return 1;
}
if (test_losspct && ((100.0*rand()/(RAND_MAX+1.0)) < test_losspct)) { /* simulate random loss condition */
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
return 1;
}
/* Mark as ready and send on its way */
@@ -7935,34 +8007,44 @@
return 1;
}
-static void destroy_helper(struct iax2_thread *thread)
-{
- ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
- ast_mutex_destroy(&thread->lock);
- ast_cond_destroy(&thread->cond);
- free(thread);
-}
-
static void *iax2_process_thread(void *data)
{
- struct iax2_thread *thread_copy, *thread = data;
+ struct iax2_thread *thread = data;
+ struct timeval tv;
+ struct timespec ts;
for(;;) {
/* Wait for something to signal us to be awake */
ast_mutex_lock(&thread->lock);
- ast_cond_wait(&thread->cond, &thread->lock);
+ if (thread->type == IAX_TYPE_DYNAMIC) {
+ /* Wait to be signalled or time out */
+ tv = ast_tvadd(ast_tvnow(), ast_samp2tv(30000, 1000));
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000;
+ if (ast_cond_timedwait(&thread->cond, &thread->lock, &ts) == ETIMEDOUT) {
+ ast_mutex_unlock(&thread->lock);
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_REMOVE(&dynamic_list, thread, list);
+ iaxdynamicthreadcount--;
+ AST_LIST_UNLOCK(&dynamic_list);
+ break;
+ }
+ } else {
+ ast_cond_wait(&thread->cond, &thread->lock);
+ }
ast_mutex_unlock(&thread->lock);
- /* Unlink from idlelist / activelist if there*/
- ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
- ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
- /* If instructed to halt, stop now */
+
+ /* If we were signalled, then we are already out of both lists or we are shutting down */
if (thread->halt) {
- ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
- ASTOBJ_UNREF(thread, destroy_helper);
break;
}
- /* Remove our reference */
- ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+
+ /* Add ourselves to the active list now */
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_INSERT_HEAD(&active_list, thread, list);
+ AST_LIST_UNLOCK(&active_list);
+
+ /* See what we need to do */
switch(thread->iostate) {
case IAX_IOSTATE_READY:
thread->actions++;
@@ -7982,16 +8064,31 @@
#ifdef DEBUG_SCHED_MULTITHREAD
thread->curfunc[0]='\0';
#endif
- ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
- /* Make a copy so we don't lose thread, but if
- we become unreferenced here, our thread gets
- cancelled anyway, so it's okay */
- thread_copy = thread;
- ASTOBJ_UNREF(thread_copy, destroy_helper);
- thread_copy = thread;
- ASTOBJ_UNREF(thread_copy, destroy_helper);
- }
+
+ /* Now... remove ourselves from the active list, and return to the idle list */
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_REMOVE(&active_list, thread, list);
+ thread->list.next = NULL;
+ AST_LIST_UNLOCK(&active_list);
+
+ /* Go back into our respective list */
+ if (thread->type == IAX_TYPE_DYNAMIC) {
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_INSERT_TAIL(&dynamic_list, thread, list);
+ AST_LIST_UNLOCK(&dynamic_list);
+ } else {
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
+ }
+ }
+
+ /* Free our own memory */
+ ast_mutex_destroy(&thread->lock);
+ ast_cond_destroy(&thread->cond);
+ free(thread);
+ thread = NULL;
+
return NULL;
}
@@ -8396,12 +8493,10 @@
{
int threadcount = 0;
int x;
- ASTOBJ_CONTAINER_INIT(&idlelist);
- ASTOBJ_CONTAINER_INIT(&activelist);
for (x = 0; x < iaxthreadcount; x++) {
struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
if (thread) {
- ASTOBJ_INIT(thread);
+ thread->type = IAX_TYPE_POOL;
thread->threadnum = ++threadcount;
ast_mutex_init(&thread->lock);
ast_cond_init(&thread->cond, NULL);
@@ -8410,8 +8505,9 @@
free(thread);
thread = NULL;
}
- ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
- ASTOBJ_UNREF(thread, destroy_helper);
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+ AST_LIST_UNLOCK(&idle_list);
}
}
ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
@@ -9062,6 +9158,21 @@
} else if (iaxthreadcount > 256) {
ast_log(LOG_NOTICE, "limiting iaxthreadcount to 256\n");
iaxthreadcount = 256;
+ }
+ }
+ } else if (!strcasecmp(v->name, "iaxmaxthreadcount")) {
+ if (reload) {
+ AST_LIST_LOCK(&dynamic_list);
+ iaxmaxthreadcount = atoi(v->value);
+ AST_LIST_UNLOCK(&dynamic_list);
+ } else {
+ iaxmaxthreadcount = atoi(v->value);
+ if (iaxmaxthreadcount < 0) {
+ ast_log(LOG_NOTICE, "iaxmaxthreadcount must be at least 0.\n");
+ iaxmaxthreadcount = 0;
+ } else if (iaxmaxthreadcount > 256) {
+ ast_log(LOG_NOTICE, "Limiting iaxmaxthreadcount to 256\n");
+ iaxmaxthreadcount = 256;
}
}
} else if (!strcasecmp(v->name, "nochecksums")) {
@@ -9935,7 +10046,9 @@
static int __unload_module(void)
{
+ struct iax2_thread *thread = NULL;
int x;
+
/* Cancel the network thread, close the net socket */
if (netthreadid != AST_PTHREADT_NULL) {
pthread_cancel(netthreadid);
@@ -9949,19 +10062,29 @@
ast_mutex_unlock(&sched_lock);
pthread_join(schedthreadid, NULL);
}
- while (idlelist.head || activelist.head) {
- ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
- iterator->halt = 1;
- signal_condition(&iterator->lock, &iterator->cond);
- });
- ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
- iterator->halt = 1;
- signal_condition(&iterator->lock, &iterator->cond);
- });
- usleep(100000);
- }
- ASTOBJ_CONTAINER_DESTROY(&idlelist);
- ASTOBJ_CONTAINER_DESTROY(&activelist);
+
+ /* Call for all threads to halt */
+ AST_LIST_LOCK(&idle_list);
+ AST_LIST_TRAVERSE(&idle_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&idle_list);
+
+ AST_LIST_LOCK(&active_list);
+ AST_LIST_TRAVERSE(&active_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&active_list);
+
+ AST_LIST_LOCK(&dynamic_list);
+ AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+ thread->halt = 1;
+ signal_condition(&thread->lock, &thread->cond);
+ }
+ AST_LIST_UNLOCK(&dynamic_list);
+
ast_netsock_release(netsock);
for (x=0;x<IAX_MAX_CALLS;x++)
if (iaxs[x])
Modified: team/file/iax2-threading/configs/iax.conf.sample
URL: http://svn.digium.com/view/asterisk/team/file/iax2-threading/configs/iax.conf.sample?rev=16801&r1=16800&r2=16801&view=diff
==============================================================================
--- team/file/iax2-threading/configs/iax.conf.sample (original)
+++ team/file/iax2-threading/configs/iax.conf.sample Fri Mar 31 14:24:24 2006
@@ -166,6 +166,8 @@
; IAX helper threads
; Establishes the number of iax helper threads to handle I/O.
; iaxthreadcount = 10
+; Establishes the maximum number of dynamic iax helper threads (threads that will only occur under heavy load - ie: when pool threads are unavailable)
+; iaxmaxthreadcount = 100
;
; We can register with another IAX server to let him know where we are
; in case we have a dynamic IP address for example
More information about the asterisk-commits
mailing list