[asterisk-commits] trunk r19254 - in /trunk: channels/chan_iax2.c configs/iax.conf.sample

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Tue Apr 11 09:44:13 MST 2006


Author: file
Date: Tue Apr 11 11:44:10 2006
New Revision: 19254

URL: http://svn.digium.com/view/asterisk?rev=19254&view=rev
Log:
Convert chan_iax2 to use linked lists for multithreading, and add dynamic threads. These are used when all pool threads are in use, and will stick around until load dies down. The theory is that during high load you'll have more threads available, and during low load you'll only have the normal pool threads sticking around.

Modified:
    trunk/channels/chan_iax2.c
    trunk/configs/iax.conf.sample

Modified: trunk/channels/chan_iax2.c
URL: http://svn.digium.com/view/asterisk/trunk/channels/chan_iax2.c?rev=19254&r1=19253&r2=19254&view=diff
==============================================================================
--- trunk/channels/chan_iax2.c (original)
+++ trunk/channels/chan_iax2.c Tue Apr 11 11:44:10 2006
@@ -92,6 +92,7 @@
 #include "asterisk/devicestate.h"
 #include "asterisk/netsock.h"
 #include "asterisk/stringfields.h"
+#include "asterisk/linkedlists.h"
 
 #include "iax2.h"
 #include "iax2-parser.h"
@@ -134,6 +135,7 @@
 #define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
 
 #define DEFAULT_THREAD_COUNT 10
+#define DEFAULT_MAX_THREAD_COUNT 100
 #define DEFAULT_RETRY_TIME 1000
 #define MEMORY_SIZE 100
 #define DEFAULT_DROP 3
@@ -444,6 +446,8 @@
 static int min_jitter_buffer = MIN_JITTER_BUFFER;
 
 static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+static int iaxmaxthreadcount = DEFAULT_MAX_THREAD_COUNT;
+static int iaxdynamicthreadcount = 0;
 
 struct iax_rr {
 	int jitter;
@@ -681,8 +685,12 @@
 #define IAX_IOSTATE_PROCESSING	2
 #define IAX_IOSTATE_SCHEDREADY	3
 
+#define IAX_TYPE_POOL    1
+#define IAX_TYPE_DYNAMIC 2
+
 struct iax2_thread {
-	ASTOBJ_COMPONENTS(struct iax2_thread);
+	AST_LIST_ENTRY(iax2_thread) list;
+	int type;
 	int iostate;
 #ifdef SCHED_MULTITHREADED
 	void (*schedfunc)(void *);
@@ -704,11 +712,12 @@
 	ast_cond_t cond;
 };
 
-struct iax2_thread_list {
-	ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
-};
-
-static struct iax2_thread_list idlelist, activelist;
+/* Thread lists */
+static AST_LIST_HEAD_STATIC(idle_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(active_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(dynamic_list, iax2_thread);
+
+static void *iax2_process_thread(void *data);
 
 static void signal_condition(ast_mutex_t *lock, ast_cond_t *cond)
 {
@@ -831,19 +840,59 @@
 
 static struct iax2_thread *find_idle_thread(void)
 {
-	struct iax2_thread *thread;
-	thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+	struct iax2_thread *thread = NULL;
+
+	/* Find free idle thread in the list, get a pointer to it, and remove it from the list */
+	AST_LIST_LOCK(&idle_list);
+	thread = AST_LIST_FIRST(&idle_list);
+	if (thread != NULL) {
+		AST_LIST_REMOVE(&idle_list, thread, list);
+		thread->list.next = NULL;
+	}
+	AST_LIST_UNLOCK(&idle_list);
+
+	/* If no idle thread is available from the regular list, try dynamic */
+	if (thread == NULL) {
+		AST_LIST_LOCK(&dynamic_list);
+		thread = AST_LIST_FIRST(&dynamic_list);
+		if (thread != NULL) {
+			AST_LIST_REMOVE(&dynamic_list, thread, list);
+			thread->list.next = NULL;
+		}
+		/* Make sure we absolutely have a thread... if not, try to make one if allowed */
+		if (thread == NULL && iaxmaxthreadcount > iaxdynamicthreadcount) {
+			/* We need to MAKE a thread! */
+			thread = ast_calloc(1, sizeof(*thread));
+			if (thread != NULL) {
+				thread->threadnum = iaxdynamicthreadcount;
+				thread->type = IAX_TYPE_DYNAMIC;
+				ast_mutex_init(&thread->lock);
+				ast_cond_init(&thread->cond, NULL);
+				if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+					free(thread);
+					thread = NULL;
+				} else {
+					/* All went well and the thread is up, so increment our count */
+					iaxdynamicthreadcount++;
+				}
+			}
+		}
+		AST_LIST_UNLOCK(&dynamic_list);
+	}
+
 	return thread;
 }
 
 #ifdef SCHED_MULTITHREADED
 static int __schedule_action(void (*func)(void *data), void *data, const char *funcname)
 {
-	struct iax2_thread *thread;
+	struct iax2_thread *thread = NULL;
 	static time_t lasterror;
 	static time_t t;
+
 	thread = find_idle_thread();
-	if (thread) {
+
+	if (thread != NULL) {
 		thread->schedfunc = func;
 		thread->scheddata = data;
 		thread->iostate = IAX_IOSTATE_SCHEDREADY;
@@ -857,6 +906,7 @@
 	if (t != lasterror) 
 		ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
 	lasterror = t;
+
 	return -1;
 }
 #define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
@@ -4419,42 +4469,60 @@
 
 static int iax2_show_threads(int fd, int argc, char *argv[])
 {
+	struct iax2_thread *thread = NULL;
 	time_t t;
-	int threadcount = 0;
+	int threadcount = 0, dynamiccount = 0;
+	char type;
+
 	if (argc != 3)
 		return RESULT_SHOWUSAGE;
 		
 	ast_cli(fd, "IAX2 Thread Information\n");
 	time(&t);
 	ast_cli(fd, "Idle Threads:\n");
+	AST_LIST_LOCK(&idle_list);
+	AST_LIST_TRAVERSE(&idle_list, thread, list) {
 #ifdef DEBUG_SCHED_MULTITHREAD
-	ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n", 
-			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
+		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n", 
+			thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n", 
+			thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
 		threadcount++;
-	});
+	}
+	AST_LIST_UNLOCK(&idle_list);
+	ast_cli(fd, "Active Threads:\n");
+	AST_LIST_LOCK(&active_list);
+	AST_LIST_TRAVERSE(&active_list, thread, list) {
+		if (thread->type == IAX_TYPE_DYNAMIC)
+			type = 'D';
+		else
+			type = 'P';
+#ifdef DEBUG_SCHED_MULTITHREAD
+		ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d, func ='%s'\n", 
+			type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
 #else
-	ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n", 
-			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+		ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d\n", 
+			type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
 		threadcount++;
-	});
+	}
+	AST_LIST_UNLOCK(&active_list);
+	ast_cli(fd, "Dynamic Threads:\n");
+        AST_LIST_LOCK(&dynamic_list);
+        AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+#ifdef DEBUG_SCHED_MULTITHREAD
+                ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+                        thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+                ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+                        thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
 #endif
-	ast_cli(fd, "Active Threads:\n");
-#ifdef DEBUG_SCHED_MULTITHREAD
-	ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n", 
-			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
-		threadcount++;
-	});
-#else
-	ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-		ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n", 
-			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
-		threadcount++;
-	});
-#endif
-	ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+		dynamiccount++;
+        }
+        AST_LIST_UNLOCK(&dynamic_list);
+	ast_cli(fd, "%d of %d threads accounted for with %d dynamic threads\n", threadcount, iaxthreadcount, dynamiccount);
 	return RESULT_SUCCESS;
 }
 
@@ -6512,11 +6580,15 @@
 			if (errno != ECONNREFUSED)
 				ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
 			handle_error();
-			ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+			AST_LIST_LOCK(&idle_list);
+			AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+			AST_LIST_UNLOCK(&idle_list);
 			return 1;
 		}
 		if (test_losspct && ((100.0 * ast_random() / (RAND_MAX + 1.0)) < test_losspct)) { /* simulate random loss condition */
-			ASTOBJ_CONTAINER_LINK_END(&idlelist, thread); 
+			AST_LIST_LOCK(&idle_list);
+			AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+			AST_LIST_UNLOCK(&idle_list);
 			return 1;
 		}
 		/* Mark as ready and send on its way */
@@ -7891,34 +7963,44 @@
 	return 1;
 }
 
-static void destroy_helper(struct iax2_thread *thread)
-{
-	ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
-	ast_mutex_destroy(&thread->lock);
-	ast_cond_destroy(&thread->cond);
-	free(thread);
-}
-
 static void *iax2_process_thread(void *data)
 {
-	struct iax2_thread *thread_copy, *thread = data;
+	struct iax2_thread *thread = data;
+	struct timeval tv;
+	struct timespec ts;
 
 	for(;;) {
 		/* Wait for something to signal us to be awake */
 		ast_mutex_lock(&thread->lock);
-		ast_cond_wait(&thread->cond, &thread->lock);
+		if (thread->type == IAX_TYPE_DYNAMIC) {
+			/* Wait to be signalled or time out */
+			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(30000, 1000));
+			ts.tv_sec = tv.tv_sec;
+			ts.tv_nsec = tv.tv_usec * 1000;
+			if (ast_cond_timedwait(&thread->cond, &thread->lock, &ts) == ETIMEDOUT) {
+				ast_mutex_unlock(&thread->lock);
+				AST_LIST_LOCK(&dynamic_list);
+				AST_LIST_REMOVE(&dynamic_list, thread, list);
+				iaxdynamicthreadcount--;
+				AST_LIST_UNLOCK(&dynamic_list);
+				break;
+			}
+		} else {
+			ast_cond_wait(&thread->cond, &thread->lock);
+		}
 		ast_mutex_unlock(&thread->lock);
-		/* Unlink from idlelist / activelist if there*/
-		ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
-		ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
-		/* If instructed to halt, stop now */
+
+		/* If we were signalled, then we are already out of both lists or we are shutting down */
 		if (thread->halt) {
-			ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
-			ASTOBJ_UNREF(thread, destroy_helper);
 			break;
 		}
-		/* Remove our reference */
-		ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+
+		/* Add ourselves to the active list now */
+		AST_LIST_LOCK(&active_list);
+		AST_LIST_INSERT_HEAD(&active_list, thread, list);
+		AST_LIST_UNLOCK(&active_list);
+
+		/* See what we need to do */
 		switch(thread->iostate) {
 		case IAX_IOSTATE_READY:
 			thread->actions++;
@@ -7938,16 +8020,31 @@
 #ifdef DEBUG_SCHED_MULTITHREAD
 		thread->curfunc[0]='\0';
 #endif		
-		ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
-		ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
-		/* Make a copy so we don't lose thread, but if 
-		   we become unreferenced here, our thread gets
-		   cancelled anyway, so it's okay */
-		thread_copy = thread;
-		ASTOBJ_UNREF(thread_copy, destroy_helper);
-		thread_copy = thread;
-		ASTOBJ_UNREF(thread_copy, destroy_helper);
-	}
+
+		/* Now... remove ourselves from the active list, and return to the idle list */
+		AST_LIST_LOCK(&active_list);
+		AST_LIST_REMOVE(&active_list, thread, list);
+		thread->list.next = NULL;
+		AST_LIST_UNLOCK(&active_list);
+
+		/* Go back into our respective list */
+		if (thread->type == IAX_TYPE_DYNAMIC) {
+			AST_LIST_LOCK(&dynamic_list);
+			AST_LIST_INSERT_TAIL(&dynamic_list, thread, list);
+			AST_LIST_UNLOCK(&dynamic_list);
+		} else {
+			AST_LIST_LOCK(&idle_list);
+			AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+			AST_LIST_UNLOCK(&idle_list);
+		}
+	}
+
+	/* Free our own memory */
+	ast_mutex_destroy(&thread->lock);
+	ast_cond_destroy(&thread->cond);
+	free(thread);
+	thread = NULL;
+
 	return NULL;
 }
 
@@ -8352,12 +8449,10 @@
 {
 	int threadcount = 0;
 	int x;
-	ASTOBJ_CONTAINER_INIT(&idlelist);
-	ASTOBJ_CONTAINER_INIT(&activelist);
 	for (x = 0; x < iaxthreadcount; x++) {
 		struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
 		if (thread) {
-			ASTOBJ_INIT(thread);
+			thread->type = IAX_TYPE_POOL;
 			thread->threadnum = ++threadcount;
 			ast_mutex_init(&thread->lock);
 			ast_cond_init(&thread->cond, NULL);
@@ -8366,8 +8461,9 @@
 				free(thread);
 				thread = NULL;
 			}
-			ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
-			ASTOBJ_UNREF(thread, destroy_helper);
+			AST_LIST_LOCK(&idle_list);
+			AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+			AST_LIST_UNLOCK(&idle_list);
 		}
 	}
 	ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
@@ -9018,6 +9114,21 @@
 				} else if (iaxthreadcount > 256) {
 					ast_log(LOG_NOTICE, "limiting iaxthreadcount to 256\n");
 					iaxthreadcount = 256;
+				}
+			}
+		} else if (!strcasecmp(v->name, "iaxmaxthreadcount")) {
+			if (reload) {
+				AST_LIST_LOCK(&dynamic_list);
+				iaxmaxthreadcount = atoi(v->value);
+				AST_LIST_UNLOCK(&dynamic_list);
+			} else {
+				iaxmaxthreadcount = atoi(v->value);
+				if (iaxmaxthreadcount < 0) {
+					ast_log(LOG_NOTICE, "iaxmaxthreadcount must be at least 0.\n");
+					iaxmaxthreadcount = 0;
+				} else if (iaxmaxthreadcount > 256) {
+					ast_log(LOG_NOTICE, "Limiting iaxmaxthreadcount to 256\n");
+					iaxmaxthreadcount = 256;
 				}
 			}
 		} else if (!strcasecmp(v->name, "nochecksums")) {
@@ -9891,7 +10002,9 @@
 
 static int __unload_module(void)
 {
+	struct iax2_thread *thread = NULL;
 	int x;
+
 	/* Cancel the network thread, close the net socket */
 	if (netthreadid != AST_PTHREADT_NULL) {
 		pthread_cancel(netthreadid);
@@ -9905,19 +10018,29 @@
 		ast_mutex_unlock(&sched_lock);
 		pthread_join(schedthreadid, NULL);
 	}
-	while (idlelist.head || activelist.head) {
-		ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-			iterator->halt = 1;
-			signal_condition(&iterator->lock, &iterator->cond);
-		});
-		ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-			iterator->halt = 1;
-			signal_condition(&iterator->lock, &iterator->cond);
-		});
-		usleep(100000);
-	}
-	ASTOBJ_CONTAINER_DESTROY(&idlelist);
-	ASTOBJ_CONTAINER_DESTROY(&activelist);
+
+	/* Call for all threads to halt */
+	AST_LIST_LOCK(&idle_list);
+	AST_LIST_TRAVERSE(&idle_list, thread, list) {
+		thread->halt = 1;
+		signal_condition(&thread->lock, &thread->cond);
+	}
+	AST_LIST_UNLOCK(&idle_list);
+
+	AST_LIST_LOCK(&active_list);
+	AST_LIST_TRAVERSE(&active_list, thread, list) {
+		thread->halt = 1;
+		signal_condition(&thread->lock, &thread->cond);
+	}
+	AST_LIST_UNLOCK(&active_list);
+
+	AST_LIST_LOCK(&dynamic_list);
+        AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+                thread->halt = 1;
+                signal_condition(&thread->lock, &thread->cond);
+        }
+        AST_LIST_UNLOCK(&dynamic_list);
+
 	ast_netsock_release(netsock);
 	for (x=0;x<IAX_MAX_CALLS;x++)
 		if (iaxs[x])

Modified: trunk/configs/iax.conf.sample
URL: http://svn.digium.com/view/asterisk/trunk/configs/iax.conf.sample?rev=19254&r1=19253&r2=19254&view=diff
==============================================================================
--- trunk/configs/iax.conf.sample (original)
+++ trunk/configs/iax.conf.sample Tue Apr 11 11:44:10 2006
@@ -170,6 +170,8 @@
 ; IAX helper threads
 ; Establishes the number of iax helper threads to handle I/O.
 ; iaxthreadcount = 10
+; Establishes the number of extra dynamic threads that may be spawned to handle I/O
+; iaxmaxthreadcount = 100
 ;
 ; We can register with another IAX server to let him know where we are
 ; in case we have a dynamic IP address for example



More information about the asterisk-commits mailing list