[asterisk-commits] trunk r11192 - in /trunk: channels/chan_iax2.c configs/iax.conf.sample

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Sun Feb 26 13:27:15 MST 2006


Author: markster
Date: Sun Feb 26 14:27:14 2006
New Revision: 11192

URL: http://svn.digium.com/view/asterisk?rev=11192&view=rev
Log:
Make IAX2 multithreaded

Modified:
    trunk/channels/chan_iax2.c
    trunk/configs/iax.conf.sample

Modified: trunk/channels/chan_iax2.c
URL: http://svn.digium.com/view/asterisk/trunk/channels/chan_iax2.c?rev=11192&r1=11191&r2=11192&view=diff
==============================================================================
--- trunk/channels/chan_iax2.c (original)
+++ trunk/channels/chan_iax2.c Sun Feb 26 14:27:14 2006
@@ -100,6 +100,10 @@
  * otherwise, use the old jitterbuffer */
 #define NEWJB
 
+/* Define SCHED_MULTITHREADED to run the scheduler in a special
+   multithreaded mode. */
+#define SCHED_MULTITHREADED
+
 #ifdef NEWJB
 #include "../jitterbuf.h"
 #endif
@@ -124,6 +128,7 @@
 #define PTR_TO_CALLNO(a) ((unsigned short)(unsigned long)(a))
 #define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
 
+#define DEFAULT_THREAD_COUNT 10
 #define DEFAULT_RETRY_TIME 1000
 #define MEMORY_SIZE 100
 #define DEFAULT_DROP 3
@@ -230,6 +235,7 @@
 static struct ast_flags globalflags = { 0 };
 
 static pthread_t netthreadid = AST_PTHREADT_NULL;
+static pthread_t schedthreadid = AST_PTHREADT_NULL;
 
 enum {
 	IAX_STATE_STARTED = 		(1 << 0),
@@ -428,6 +434,8 @@
 static int max_jitter_buffer = MAX_JITTER_BUFFER;
 /* If we have less than this much excess real jitter buffer, enlarge it. */
 static int min_jitter_buffer = MIN_JITTER_BUFFER;
+
+static int iaxthreadcount = DEFAULT_THREAD_COUNT;
 
 struct iax_rr {
 	int jitter;
@@ -660,6 +668,35 @@
 static void destroy_peer(struct iax2_peer *peer);
 static int ast_cli_netstats(int fd, int limit_fmt);
 
+#define IAX_IOSTATE_IDLE		0
+#define IAX_IOSTATE_READY		1
+#define IAX_IOSTATE_PROCESSING	2
+#define IAX_IOSTATE_SCHEDREADY	3
+
+struct iax2_thread {
+	ASTOBJ_COMPONENTS(struct iax2_thread);
+	int iostate;
+#ifdef SCHED_MULTITHREADED
+	void (*schedfunc)(void *);
+	void *scheddata;
+#endif
+	int actions;
+	int halt;
+	pthread_t threadid;
+	int threadnum;
+	struct sockaddr_in iosin;
+	unsigned char buf[4096]; 
+	int iores;
+	int iofd;
+	time_t checktime;
+};
+
+struct iax2_thread_list {
+	ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
+};
+
+static struct iax2_thread_list idlelist, activelist;
+
 static void iax_debug_output(const char *data)
 {
 	if (iaxdebug)
@@ -771,18 +808,58 @@
 	.fixup = iax2_fixup,
 };
 
+static struct iax2_thread *find_idle_thread(void)
+{
+	struct iax2_thread *thread;
+	thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+	return thread;
+}
+
+#ifdef SCHED_MULTITHREADED
+static int schedule_action(void (*func)(void *data), void *data)
+{
+	struct iax2_thread *thread;
+	static time_t lasterror;
+	static time_t t;
+	thread = find_idle_thread();
+	if (thread) {
+		thread->schedfunc = func;
+		thread->scheddata = data;
+		thread->iostate = IAX_IOSTATE_SCHEDREADY;
+		pthread_kill(thread->threadid, SIGURG);
+		return 0;
+	}
+	time(&t);
+	if (t != lasterror) 
+		ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
+	lasterror = t;
+	return -1;
+}
+#endif
+
+static void __send_ping(void *data)
+{
+	int callno = (long)data;
+	send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+}
+
 static int send_ping(void *data)
 {
 	int callno = (long)data;
-	/* Ping only if it's real, not if it's bridged */
 	if (iaxs[callno]) {
 #ifdef BRIDGE_OPTIMIZATION
-		if (!iaxs[callno]->bridgecallno)
+		if (!iaxs[callno]->bridgecallno) 
 #endif
-			send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+		{		
+#ifdef SCHED_MULTITHREADED
+			if (schedule_action(__send_ping, data))
+#endif		
+				__send_ping(data);
+		}
 		return 1;
 	} else
 		return 0;
+	return 0;
 }
 
 static int get_encrypt_methods(const char *s)
@@ -797,18 +874,30 @@
 	return e;
 }
 
-static int send_lagrq(void *data)
+static void __send_lagrq(void *data)
 {
 	int callno = (long)data;
 	/* Ping only if it's real not if it's bridged */
+	send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+}
+
+static int send_lagrq(void *data)
+{
+	int callno = (long)data;
 	if (iaxs[callno]) {
 #ifdef BRIDGE_OPTIMIZATION
-		if (!iaxs[callno]->bridgecallno)
+		if (!iaxs[callno]->bridgecallno) 
+#endif
+		{		
+#ifdef SCHED_MULTITHREADED
+			if (schedule_action(__send_lagrq, data))
 #endif		
-			send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+				__send_lagrq(data);
+		}
 		return 1;
 	} else
 		return 0;
+	return 0;
 }
 
 static unsigned char compress_subclass(int subclass)
@@ -1422,7 +1511,7 @@
 }
 
 #ifndef NEWJB
-static int do_deliver(void *data)
+static int __real_do_deliver(void *data)
 {
 	/* Locking version of __do_deliver */
 	struct iax_frame *fr = data;
@@ -1432,6 +1521,14 @@
 	res = __do_deliver(data);
 	ast_mutex_unlock(&iaxsl[callno]);
 	return res;
+}
+static int do_deliver(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__do_deliver, data))
+#endif		
+		__real_do_deliver(data);
+	return 0;
 }
 #endif /* NEWJB */
 
@@ -1693,7 +1790,8 @@
 	return 0;
 }
 
-static int attempt_transmit(void *data)
+static int attempt_transmit(void *data);
+static void __attempt_transmit(void *data)
 {
 	/* Attempt to transmit the frame to the remote peer...
 	   Called without iaxsl held. */
@@ -1780,6 +1878,14 @@
 		/* Free the IAX frame */
 		iax2_frame_free(f);
 	}
+}
+
+static int attempt_transmit(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__attempt_transmit, data))
+#endif		
+		__attempt_transmit(data);
 	return 0;
 }
 
@@ -2166,9 +2272,10 @@
     }
 
     pvt->jbid = ast_sched_add(sched, when, get_from_jb, (void *)pvt);
-}
-
-static int get_from_jb(void *p) 
+	pthread_kill(schedthreadid, SIGURG);
+}
+
+static void __get_from_jb(void *p) 
 {
 	/* make sure pvt is valid! */	
     struct chan_iax2_pvt *pvt = p;
@@ -2238,7 +2345,15 @@
     }
     update_jbsched(pvt);
     ast_mutex_unlock(&iaxsl[pvt->callno]);
-    return 0;
+}
+
+static int get_from_jb(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__get_from_jb, data))
+#endif		
+		__get_from_jb(data);
+	return 0;
 }
 #endif
 
@@ -2491,6 +2606,7 @@
 		if (option_debug && iaxdebug)
 			ast_log(LOG_DEBUG, "schedule_delivery: Scheduling delivery in %d ms\n", delay);
 		fr->retrans = ast_sched_add(sched, delay, do_deliver, fr);
+		pthread_kill(schedthreadid, SIGURG);
 	}
 #endif
 	if (tsout)
@@ -2524,8 +2640,9 @@
 	}
 	iaxq.count++;
 	ast_mutex_unlock(&iaxq.lock);
-	/* Wake up the network thread */
+	/* Wake up the network and scheduler thread */
 	pthread_kill(netthreadid, SIGURG);
+	pthread_kill(schedthreadid, SIGURG);
 	return 0;
 }
 
@@ -2827,7 +2944,7 @@
 	return 0;
 }
 
-static int auto_congest(void *nothing)
+static void __auto_congest(void *nothing)
 {
 	int callno = PTR_TO_CALLNO(nothing);
 	struct ast_frame f = { AST_FRAME_CONTROL, AST_CONTROL_CONGESTION };
@@ -2838,6 +2955,14 @@
 		ast_log(LOG_NOTICE, "Auto-congesting call due to slow response\n");
 	}
 	ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_congest(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__auto_congest, data))
+#endif		
+		__auto_congest(data);
 	return 0;
 }
 
@@ -4280,6 +4405,31 @@
 #undef FORMAT2
 }
 
+static int iax2_show_threads(int fd, int argc, char *argv[])
+{
+	time_t t;
+	int threadcount = 0;
+	if (argc != 3)
+		return RESULT_SHOWUSAGE;
+		
+	ast_cli(fd, "IAX2 Thread Information\n");
+	time(&t);
+	ast_cli(fd, "Idle Threads:\n");
+	ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+		ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n", 
+			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+		threadcount++;
+	});
+	ast_cli(fd, "Active Threads:\n");
+	ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+		ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n", 
+			iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+		threadcount++;
+	});
+	ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+	return RESULT_SUCCESS;
+}
+
 static int iax2_show_peers(int fd, int argc, char *argv[])
 {
 	return __iax2_show_peers(0, fd, argc, argv);
@@ -5277,11 +5427,19 @@
 
 static int iax2_do_register(struct iax2_registry *reg);
 
-static int iax2_do_register_s(void *data)
+static void __iax2_do_register_s(void *data)
 {
 	struct iax2_registry *reg = data;
 	reg->expire = -1;
 	iax2_do_register(reg);
+}
+
+static int iax2_do_register_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__iax2_do_register_s, data))
+#endif		
+		__iax2_do_register_s(data);
 	return 0;
 }
 
@@ -5562,7 +5720,7 @@
 }
 static void prune_peers(void);
 
-static int expire_registry(void *data)
+static void __expire_registry(void *data)
 {
 	struct iax2_peer *p = data;
 
@@ -5584,10 +5742,16 @@
 		ast_set_flag(p, IAX_DELME);
 		prune_peers();
 	}
-
+}
+
+static int expire_registry(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__expire_registry, data))
+#endif		
+		__expire_registry(data);
 	return 0;
 }
-
 
 static int iax2_poke_peer(struct iax2_peer *peer, int heldcall);
 
@@ -5828,7 +5992,7 @@
 		return 0;
 }
 
-static int auth_reject(void *nothing)
+static void __auth_reject(void *nothing)
 {
 	/* Called from IAX thread only, without iaxs lock */
 	int callno = (int)(long)(nothing);
@@ -5847,6 +6011,14 @@
 		send_command_final(iaxs[callno], AST_FRAME_IAX, iaxs[callno]->authfail, 0, ied.buf, ied.pos, -1);
 	}
 	ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auth_reject(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__auth_reject, data))
+#endif		
+		__auth_reject(data);
 	return 0;
 }
 
@@ -5868,7 +6040,7 @@
 	return 0;
 }
 
-static int auto_hangup(void *nothing)
+static void __auto_hangup(void *nothing)
 {
 	/* Called from IAX thread only, without iaxs lock */
 	int callno = (int)(long)(nothing);
@@ -5882,6 +6054,14 @@
 		send_command_final(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_HANGUP, 0, ied.buf, ied.pos, -1);
 	}
 	ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_hangup(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__auto_hangup, data))
+#endif		
+		__auto_hangup(data);
 	return 0;
 }
 
@@ -5919,11 +6099,19 @@
 	ast_mutex_unlock(&iaxq.lock);
 }
 
-static int iax2_poke_peer_s(void *data)
+static void __iax2_poke_peer_s(void *data)
 {
 	struct iax2_peer *peer = data;
 	peer->pokeexpire = -1;
 	iax2_poke_peer(peer, 0);
+}
+
+static int iax2_poke_peer_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__iax2_poke_peer_s, data))
+#endif		
+		__iax2_poke_peer_s(data);
 	return 0;
 }
 
@@ -6258,18 +6446,50 @@
 
 static int socket_read(int *id, int fd, short events, void *cbdata)
 {
+	struct iax2_thread *thread;
+	socklen_t len;
+	thread = find_idle_thread();
+	time_t t;
+	static time_t last_errtime=0;
+	if (thread) {
+		len = sizeof(thread->iosin);
+		thread->iofd = fd;
+		thread->iores = recvfrom(fd, thread->buf, sizeof(thread->buf), 0,(struct sockaddr *) &thread->iosin, &len);
+		if (thread->iores < 0) {
+			if (errno != ECONNREFUSED)
+				ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
+			handle_error();
+			return 1;
+		}
+		if(test_losspct) { /* simulate random loss condition */
+			if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct) 
+				return 1;
+		}
+		/* Mark as ready and send on its way */
+		thread->iostate = IAX_IOSTATE_READY;
+		pthread_kill(thread->threadid, SIGURG);
+	} else {
+		time(&t);
+		if (t != last_errtime)
+			ast_log(LOG_NOTICE, "Out of idle IAX2 threads for I/O, pausing!\n");
+		last_errtime = t;
+		usleep(1);
+	}
+	return 1;
+}
+
+static int socket_process(struct iax2_thread *thread)
+{
 	struct sockaddr_in sin;
 	int res;
 	int updatehistory=1;
 	int new = NEW_PREVENT;
-	unsigned char buf[4096]; 
 	void *ptr;
-	socklen_t len = sizeof(sin);
 	int dcallno = 0;
-	struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)buf;
-	struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)buf;
-	struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)buf;
-	struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)buf;
+	struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)thread->buf;
+	struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)thread->buf;
+	struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)thread->buf;
+	struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)thread->buf;
 	struct ast_iax2_meta_trunk_hdr *mth;
 	struct ast_iax2_meta_trunk_entry *mte;
 	struct ast_iax2_meta_trunk_mini *mtm;
@@ -6286,6 +6506,7 @@
 	struct iax_ies ies;
 	struct iax_ie_data ied0, ied1;
 	int format;
+	int fd;
 	int exists;
 	int minivid = 0;
 	unsigned int ts;
@@ -6298,19 +6519,12 @@
 
 	dblbuf[0] = 0;	/* Keep GCC from whining */
 	fr.callno = 0;
-	
-	res = recvfrom(fd, buf, sizeof(buf), 0,(struct sockaddr *) &sin, &len);
-	if (res < 0) {
-		if (errno != ECONNREFUSED)
-			ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
-		handle_error();
-		return 1;
-	}
-	if(test_losspct) { /* simulate random loss condition */
-		if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct) 
-			return 1;
- 
-	}
+
+	/* Copy frequently used parameters to the stack */
+	res = thread->iores;
+	fd = thread->iofd;
+	memcpy(&sin, &thread->iosin, sizeof(sin));
+
 	if (res < sizeof(struct ast_iax2_mini_hdr)) {
 		ast_log(LOG_WARNING, "midget packet received (%d of %d min)\n", res, (int)sizeof(struct ast_iax2_mini_hdr));
 		return 1;
@@ -6643,14 +6857,14 @@
 
 		if (f.datalen) {
 			if (f.frametype == AST_FRAME_IAX) {
-				if (iax_parse_ies(&ies, buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
+				if (iax_parse_ies(&ies, thread->buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
 					ast_log(LOG_WARNING, "Undecodable frame received from '%s'\n", ast_inet_ntoa(iabuf, sizeof(iabuf), sin.sin_addr));
 					ast_mutex_unlock(&iaxsl[fr.callno]);
 					return 1;
 				}
 				f.data = NULL;
 			} else
-				f.data = buf + sizeof(struct ast_iax2_full_hdr);
+				f.data = thread->buf + sizeof(struct ast_iax2_full_hdr);
 		} else {
 			if (f.frametype == AST_FRAME_IAX)
 				f.data = NULL;
@@ -7526,7 +7740,7 @@
 		}
 		f.datalen = res - sizeof(struct ast_iax2_video_hdr);
 		if (f.datalen)
-			f.data = buf + sizeof(struct ast_iax2_video_hdr);
+			f.data = thread->buf + sizeof(struct ast_iax2_video_hdr);
 		else
 			f.data = NULL;
 #ifdef IAXTESTS
@@ -7553,7 +7767,7 @@
 			return 1;
 		}
 		if (f.datalen)
-			f.data = buf + sizeof(struct ast_iax2_mini_hdr);
+			f.data = thread->buf + sizeof(struct ast_iax2_mini_hdr);
 		else
 			f.data = NULL;
 #ifdef IAXTESTS
@@ -7618,6 +7832,59 @@
 	/* Always run again */
 	ast_mutex_unlock(&iaxsl[fr.callno]);
 	return 1;
+}
+
+static void destroy_helper(struct iax2_thread *thread)
+{
+	ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
+	free(thread);
+}
+
+static void *iax2_process_thread(void *data)
+{
+	struct iax2_thread *thread_copy, *thread = data;
+	struct timeval tv;
+	for(;;) {
+		/* Sleep for up to 1 second */
+		tv.tv_sec = 1;
+		tv.tv_usec = 0;
+		select(0, NULL, NULL, NULL, &tv);
+		/* Unlink from idlelist / activelist if there*/
+		ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
+		ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+		/* If instructed to halt, stop now */
+		if (thread->halt) {
+			ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
+			ASTOBJ_UNREF(thread, destroy_helper);
+			break;
+		}
+		/* Remove our reference */
+		ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+		switch(thread->iostate) {
+		case IAX_IOSTATE_READY:
+			thread->actions++;
+			thread->iostate = IAX_IOSTATE_PROCESSING;
+			socket_process(thread);
+			break;
+		case IAX_IOSTATE_SCHEDREADY:
+			thread->actions++;
+			thread->iostate = IAX_IOSTATE_PROCESSING;
+			thread->schedfunc(thread->scheddata);
+			break;
+		}
+		time(&thread->checktime);
+		thread->iostate = IAX_IOSTATE_IDLE;
+		ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+		ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+		/* Make a copy so we don't lose thread, but if 
+		   we become unreferenced here, our thread gets
+		   cancelled anyway, so it's okay */
+		thread_copy = thread;
+		ASTOBJ_UNREF(thread_copy, destroy_helper);
+		thread_copy = thread;
+		ASTOBJ_UNREF(thread_copy, destroy_helper);
+	}
+	return NULL;
 }
 
 static int iax2_do_register(struct iax2_registry *reg)
@@ -7771,7 +8038,7 @@
 	return RESULT_SUCCESS;
 }
 
-static int iax2_poke_noanswer(void *data)
+static void __iax2_poke_noanswer(void *data)
 {
 	struct iax2_peer *peer = data;
 	peer->pokeexpire = -1;
@@ -7786,6 +8053,14 @@
 	peer->lastms = -1;
 	/* Try again quickly */
 	peer->pokeexpire = ast_sched_add(sched, peer->pokefreqnotok, iax2_poke_peer_s, peer);
+}
+
+static int iax2_poke_noanswer(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+	if (schedule_action(__iax2_poke_noanswer, data))
+#endif		
+		__iax2_poke_noanswer(data);
 	return 0;
 }
 
@@ -7916,6 +8191,26 @@
 	}
 
 	return c;
+}
+
+static void *sched_thread(void *ignore)
+{
+	int count;
+	int res;
+	for (;;) {
+		res = ast_sched_wait(sched);
+		if ((res > 1000) || (res < 0))
+			res = 1000;
+		res = poll(NULL, 0, res);
+		if (res < 0) {
+			if ((errno != EAGAIN) && (errno != EINTR))
+				ast_log(LOG_WARNING, "poll failed: %s\n", strerror(errno));
+		}
+		count = ast_sched_runq(sched);
+		if (count >= 20)
+			ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
+	}
+	return NULL;
 }
 
 static void *network_thread(void *ignore)
@@ -7958,6 +8253,7 @@
 					/* We need reliable delivery.  Schedule a retransmission */
 					f->retries++;
 					f->retrans = ast_sched_add(sched, f->retrytime, attempt_transmit, f);
+					pthread_kill(schedthreadid, SIGURG);
 				}
 			}
 			f = f->next;
@@ -7969,16 +8265,10 @@
 			ast_log(LOG_DEBUG, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
 
 		/* Now do the IO, and run scheduled tasks */
-		res = ast_sched_wait(sched);
-		if ((res > 1000) || (res < 0))
-			res = 1000;
-		res = ast_io_wait(io, res);
+		res = ast_io_wait(io, -1);
 		if (res >= 0) {
 			if (res >= 20)
 				ast_log(LOG_DEBUG, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res);
-			count = ast_sched_runq(sched);
-			if (count >= 20)
-				ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
 		}
 	}
 	return NULL;
@@ -7986,7 +8276,29 @@
 
 static int start_network_thread(void)
 {
-	return ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+	int threadcount = 0;
+	int x;
+	ASTOBJ_CONTAINER_INIT(&idlelist);
+	ASTOBJ_CONTAINER_INIT(&activelist);
+	for (x = 0; x < iaxthreadcount; x++) {
+		struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
+		if (thread) {
+			ASTOBJ_INIT(thread);
+			thread->threadnum = ++threadcount;
+			if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+				ast_log(LOG_WARNING, "Failed to create new thread!\n");
+				free(thread);
+				thread = NULL;
+			}
+			ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+			ASTOBJ_UNREF(thread, destroy_helper);
+		}
+	}
+	ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
+	ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+	if (option_verbose > 1)
+		ast_verbose(VERBOSE_PREFIX_2 "%d helper threaads started\n", threadcount);
+	return 0;
 }
 
 static struct iax2_context *build_context(char *context)
@@ -8620,7 +8932,21 @@
 				portno = atoi(v->value);
 		} else if (!strcasecmp(v->name, "pingtime")) 
 			ping_time = atoi(v->value);
-		else if (!strcasecmp(v->name, "nochecksums")) {
+		else if (!strcasecmp(v->name, "iaxthreadcount")) {
+			if (reload) {
+				if (atoi(v->value) != iaxthreadcount)
+					ast_log(LOG_NOTICE, "Ignoring any changes to iaxthreadcount during reload\n");
+			} else {
+				iaxthreadcount = atoi(v->value);
+				if (iaxthreadcount < 1) {
+					ast_log(LOG_NOTICE, "iaxthreadcount must be at least 1.\n");
+					iaxthreadcount = 1;
+				} else if (iaxthreadcount > 256) {
+					ast_log(LOG_NOTICE, "limiting iaxthreadcount to 256\n");
+					iaxthreadcount = 256;
+				}
+			}
+		} else if (!strcasecmp(v->name, "nochecksums")) {
 #ifdef SO_NO_CHECK
 			if (ast_true(v->value))
 				nochecksums = 1;
@@ -8759,7 +9085,7 @@
 				amaflags = format;
 			}
 		} else if (!strcasecmp(v->name, "language")) {
-                        ast_copy_string(language, v->value, sizeof(language));
+			ast_copy_string(language, v->value, sizeof(language));
 		} /*else if (strcasecmp(v->name,"type")) */
 		/*	ast_log(LOG_WARNING, "Ignoring %s\n", v->name); */
 		v = v->next;
@@ -9366,6 +9692,10 @@
 "Usage: iax2 show netstats\n"
 "       Lists network status for all currently active IAX channels.\n";
 
+static char show_threads_usage[] = 
+"Usage: iax2 show threads\n"
+"       Lists status of IAX helper threads\n";
+
 static char show_peers_usage[] = 
 "Usage: iax2 show peers [registered] [like <pattern>]\n"
 "       Lists all known IAX2 peers.\n"
@@ -9445,6 +9775,8 @@
 	  "Show active IAX channel netstats", show_netstats_usage },
 	{ { "iax2", "show", "peers", NULL }, iax2_show_peers,
 	  "Show defined IAX peers", show_peers_usage },
+	{ { "iax2", "show", "threads", NULL }, iax2_show_threads,
+	  "Show IAX helper thread info", show_threads_usage },
 	{ { "iax2", "show", "registry", NULL }, iax2_show_registry,
 	  "Show IAX registration status", show_reg_usage },
 	{ { "iax2", "debug", NULL }, iax2_do_debug,
@@ -9481,6 +9813,23 @@
 		pthread_cancel(netthreadid);
 		pthread_join(netthreadid, NULL);
 	}
+	if (schedthreadid != AST_PTHREADT_NULL) {
+		pthread_cancel(schedthreadid);
+		pthread_join(schedthreadid, NULL);
+	}
+	while (idlelist.head || activelist.head) {
+		ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+			iterator->halt = 1;
+			pthread_kill(iterator->threadid, SIGURG);
+		});
+		ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+			iterator->halt = 1;
+			pthread_kill(iterator->threadid, SIGURG);
+		});
+		usleep(100000);
+	}
+	ASTOBJ_CONTAINER_DESTROY(&idlelist);
+	ASTOBJ_CONTAINER_DESTROY(&activelist);
 	ast_netsock_release(netsock);
 	for (x=0;x<IAX_MAX_CALLS;x++)
 		if (iaxs[x])

Modified: trunk/configs/iax.conf.sample
URL: http://svn.digium.com/view/asterisk/trunk/configs/iax.conf.sample?rev=11192&r1=11191&r2=11192&view=diff
==============================================================================
--- trunk/configs/iax.conf.sample (original)
+++ trunk/configs/iax.conf.sample Sun Feb 26 14:27:14 2006
@@ -162,6 +162,10 @@
 ; a registration expiration interval (in seconds).
 ; minregexpire = 60
 ; maxregexpire = 60
+;
+; IAX helper threads
+; Establishes the number of iax helper threads to handle I/O.
+; iaxthreadcount = 10
 ;
 ; We can register with another IAX server to let him know where we are
 ; in case we have a dynamic IP address for example



More information about the asterisk-commits mailing list