[asterisk-commits] file: branch file/scaling r58098 - in /team/file/scaling: apps/ include/aster...

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Tue Mar 6 14:46:05 MST 2007


Author: file
Date: Tue Mar  6 15:46:04 2007
New Revision: 58098

URL: http://svn.digium.com/view/asterisk?view=rev&rev=58098
Log:
Finish up epoll support. Now channels get added in before the bridge loops happens, and removed afterwards.

Modified:
    team/file/scaling/apps/app_dial.c
    team/file/scaling/include/asterisk/channel.h
    team/file/scaling/main/channel.c
    team/file/scaling/main/rtp.c

Modified: team/file/scaling/apps/app_dial.c
URL: http://svn.digium.com/view/asterisk/team/file/scaling/apps/app_dial.c?view=diff&rev=58098&r1=58097&r2=58098
==============================================================================
--- team/file/scaling/apps/app_dial.c (original)
+++ team/file/scaling/apps/app_dial.c Tue Mar  6 15:46:04 2007
@@ -535,6 +535,9 @@
 	struct ast_channel *peer = NULL;
 	/* single is set if only one destination is enabled */
 	int single = outgoing && !outgoing->next && !ast_test_flag(outgoing, OPT_MUSICBACK | OPT_RINGBACK);
+#ifdef HAVE_EPOLL
+	struct chanlist *epollo;
+#endif
 	
 	if (single) {
 		/* Turn off hold music, etc */
@@ -542,7 +545,11 @@
 		/* If we are calling a single channel, make them compatible for in-band tone purpose */
 		ast_channel_make_compatible(outgoing->chan, in);
 	}
-	
+
+#ifdef HAVE_EPOLL
+	for (epollo = outgoing; epollo; epollo = epollo->next)
+		ast_poll_channel_add(in, epollo->chan);
+#endif	
 	
 	while (*to && !peer) {
 		struct chanlist *o;
@@ -801,6 +808,11 @@
 			ast_verbose(VERBOSE_PREFIX_3 "Nobody picked up in %d ms\n", orig);
 	}
 
+#ifdef HAVE_EPOLL
+	for (epollo = outgoing; epollo; epollo = epollo->next)
+		ast_poll_channel_del(in, epollo->chan);
+#endif
+
 	return peer;
 }
 

Modified: team/file/scaling/include/asterisk/channel.h
URL: http://svn.digium.com/view/asterisk/team/file/scaling/include/asterisk/channel.h?view=diff&rev=58098&r1=58097&r2=58098
==============================================================================
--- team/file/scaling/include/asterisk/channel.h (original)
+++ team/file/scaling/include/asterisk/channel.h Tue Mar  6 15:46:04 2007
@@ -317,6 +317,8 @@
 struct ast_channel_spy_list;	/*!< \todo Add explanation here */
 struct ast_channel_whisper_buffer;	/*!< \todo Add explanation here */
 
+struct ast_epoll_data;
+
 /*!
  * The high bit of the frame count is used as a debug marker, so
  * increments of the counters must be done with care.
@@ -494,6 +496,7 @@
 
 #ifdef HAVE_EPOLL
 	int epfd;
+	struct ast_epoll_data *epfd_data[AST_MAX_FDS];
 #endif
 };
 
@@ -1188,6 +1191,12 @@
 /*! Set the file descriptor on the channel */
 void ast_channel_set_fd(struct ast_channel *chan, int which, int fd);
 
+/*! Add a channel to an optimized waitfor */
+void ast_poll_channel_add(struct ast_channel *chan0, struct ast_channel *chan1);
+
+/*! Delete a channel from an optimized waitfor */
+void ast_poll_channel_del(struct ast_channel *chan0, struct ast_channel *chan1);
+
 /*! Start a tone going */
 int ast_tonepair_start(struct ast_channel *chan, int freq1, int freq2, int duration, int vol);
 /*! Stop a tone from playing */

Modified: team/file/scaling/main/channel.c
URL: http://svn.digium.com/view/asterisk/team/file/scaling/main/channel.c?view=diff&rev=58098&r1=58097&r2=58098
==============================================================================
--- team/file/scaling/main/channel.c (original)
+++ team/file/scaling/main/channel.c Tue Mar  6 15:46:04 2007
@@ -90,6 +90,11 @@
 	struct ast_slinfactory sf;
 	unsigned int original_format;
 	struct ast_trans_pvt *path;
+};
+
+struct ast_epoll_data {
+	struct ast_channel *chan;
+	int which;
 };
 
 /* uncomment if you have problems with 'monitoring' synchronized files */
@@ -653,8 +658,12 @@
 	tmp->epfd = epoll_create(25);
 #endif
 
-	for (x = 0; x < AST_MAX_FDS; x++)
+	for (x = 0; x < AST_MAX_FDS; x++) {
 		tmp->fds[x] = -1;
+#ifdef HAVE_EPOLL
+		tmp->epfd_data[x] = NULL;
+#endif
+	}
 
 #ifdef HAVE_ZAPTEL
 	tmp->timingfd = open("/dev/zap/timer", O_RDWR);
@@ -1039,6 +1048,9 @@
 void ast_channel_free(struct ast_channel *chan)
 {
 	int fd;
+#ifdef HAVE_EPOLL
+	int i;
+#endif
 	struct ast_var_t *vardata;
 	struct ast_frame *f;
 	struct varshead *headp;
@@ -1092,6 +1104,10 @@
 	if ((fd = chan->timingfd) > -1)
 		close(fd);
 #ifdef HAVE_EPOLL
+	for (i = 0; i < AST_MAX_FDS; i++) {
+		if (chan->epfd_data[i])
+			free(chan->epfd_data[i]);
+	}
 	close(chan->epfd);
 #endif
 	while ((f = AST_LIST_REMOVE_HEAD(&chan->readq, frame_list)))
@@ -1414,15 +1430,76 @@
 {
 #ifdef HAVE_EPOLL
 	struct epoll_event ev;
-
-	if (chan->fds[which] > -1)
+	struct ast_epoll_data *aed = NULL;
+
+	if (chan->fds[which] > -1) {
 		epoll_ctl(chan->epfd, EPOLL_CTL_DEL, chan->fds[which], &ev);
-
-	ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
-	ev.data.fd = which;
-	epoll_ctl(chan->epfd, EPOLL_CTL_ADD, fd, &ev);
+		aed = chan->epfd_data[which];
+	}
+
+	/* If this new fd is valid, add it to the epoll */
+	if (fd > -1) {
+		if (!aed && (!(aed = ast_calloc(1, sizeof(*aed)))))
+			return;
+		
+		chan->epfd_data[which] = aed;
+		aed->chan = chan;
+		aed->which = which;
+		
+		ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
+		ev.data.ptr = aed;
+		epoll_ctl(chan->epfd, EPOLL_CTL_ADD, fd, &ev);
+	} else if (aed) {
+		/* We don't have to keep around this epoll data structure now */
+		free(aed);
+		chan->epfd_data[which] = NULL;
+	}
 #endif
 	chan->fds[which] = fd;
+	return;
+}
+
+/*! Add a channel to an optimized waitfor */
+void ast_poll_channel_add(struct ast_channel *chan0, struct ast_channel *chan1)
+{
+#ifdef HAVE_EPOLL
+	struct epoll_event ev;
+	int i = 0;
+
+	if (chan0->epfd == -1)
+		return;
+
+	/* Iterate through the file descriptors on chan1, adding them to chan0 */
+	for (i = 0; i < AST_MAX_FDS; i++) {
+		if (chan1->fds[i] == -1)
+			continue;
+		ev.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
+		ev.data.ptr = chan1->epfd_data[i];
+		epoll_ctl(chan0->epfd, EPOLL_CTL_ADD, chan1->fds[i], &ev);
+	}
+
+#endif
+	return;
+}
+
+/*! Delete a channel from an optimized waitfor */
+void ast_poll_channel_del(struct ast_channel *chan0, struct ast_channel *chan1)
+{
+#ifdef HAVE_EPOLL
+	struct epoll_event ev;
+	int i = 0;
+
+	if (chan0->epfd == -1)
+		return;
+
+	for (i = 0; i < AST_MAX_FDS; i++) {
+		if (chan1->fds[i] == -1)
+			continue;
+		epoll_ctl(chan0->epfd, EPOLL_CTL_DEL, chan1->fds[i], &ev);
+	}
+
+#endif
+	return;
 }
 
 /*! \brief Softly hangup a channel, don't lock */
@@ -1816,15 +1893,13 @@
 		*exception = 0;
 	
 	/* Perform any pending masquerades */
-	for (x=0; x < n; x++) {
+	for (x = 0; x < n; x++) {
 		ast_channel_lock(c[x]);
-		if (c[x]->masq) {
-			if (ast_do_masquerade(c[x])) {
-				ast_log(LOG_WARNING, "Masquerade failed\n");
-				*ms = -1;
-				ast_channel_unlock(c[x]);
-				return NULL;
-			}
+		if (c[x]->masq && ast_do_masquerade(c[x])) {
+			ast_log(LOG_WARNING, "Masquerade failed\n");
+			*ms = -1;
+			ast_channel_unlock(c[x]);
+			return NULL;
 		}
 		if (c[x]->whentohangup) {
 			if (!whentohangup)
@@ -1854,8 +1929,8 @@
 	 * individual fd's must have priority over channel fds.
 	 */
 	max = 0;
-	for (x=0; x<n; x++) {
-		for (y=0; y<AST_MAX_FDS; y++) {
+	for (x = 0; x < n; x++) {
+		for (y = 0; y < AST_MAX_FDS; y++) {
 			fdmap[max].fdno = y;  /* fd y is linked to this pfds */
 			fdmap[max].chan = x;  /* channel x is linked to this pfds */
 			max += ast_add_fd(&pfds[max], c[x]->fds[y]);
@@ -1863,7 +1938,7 @@
 		CHECK_BLOCKING(c[x]);
 	}
 	/* Add the individual fds */
-	for (x=0; x<nfds; x++) {
+	for (x = 0; x < nfds; x++) {
 		fdmap[max].chan = -1;
 		max += ast_add_fd(&pfds[max], fds[x]);
 	}
@@ -1883,7 +1958,7 @@
 	} else {
 		res = poll(pfds, max, rms);
 	}
-	for (x=0; x<n; x++)
+	for (x = 0; x < n; x++)
 		ast_clear_flag(c[x], AST_FLAG_BLOCKING);
 	if (res < 0) { /* Simulate a timeout if we were interrupted */
 		if (errno != EINTR)
@@ -1892,7 +1967,7 @@
 	}
 	if (whentohangup) {   /* if we have a timeout, check who expired */
 		time(&now);
-		for (x=0; x<n; x++) {
+		for (x = 0; x < n; x++) {
 			if (c[x]->whentohangup && now >= c[x]->whentohangup) {
 				c[x]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
 				if (winner == NULL)
@@ -1939,11 +2014,13 @@
 #ifdef HAVE_EPOLL
 static struct ast_channel *ast_waitfor_nandfds_simple(struct ast_channel *chan, int *ms)
 {
+	struct timeval start = { 0 , 0 };
 	int res = 0;
 	struct epoll_event ev[1];
 	long whentohangup = 0, rms = *ms;
 	time_t now;
 	struct ast_channel *winner = NULL;
+	struct ast_epoll_data *aed = NULL;
 
 	ast_channel_lock(chan);
 
@@ -1975,6 +2052,9 @@
 	/* Time to make this channel block... */
 	CHECK_BLOCKING(chan);
 
+	if (*ms > 0)
+		start = ast_tvnow();
+
 	/* We don't have to add any file descriptors... they are already added, we just have to wait! */
 	res = epoll_wait(chan->epfd, ev, 1, rms);
 
@@ -2004,25 +2084,124 @@
 	}
 
 	/* See what events are pending */
-	chan->fdno = ev[0].data.fd;
+	aed = ev[0].data.ptr;
+	chan->fdno = aed->which;
 	if (ev[0].events & EPOLLPRI)
 		ast_set_flag(chan, AST_FLAG_EXCEPTION);
 	else
 		ast_clear_flag(chan, AST_FLAG_EXCEPTION);
 
+	if (*ms > 0) {
+		*ms -= ast_tvdiff_ms(ast_tvnow(), start);
+		if (*ms < 0)
+			*ms = 0;
+	}
+
 	return chan;
+}
+
+static struct ast_channel *ast_waitfor_nandfds_complex(struct ast_channel **c, int n, int *ms)
+{
+	struct timeval start = { 0 , 0 };
+	int res = 0, i;
+	struct epoll_event ev[25] = { { 0, } };
+	long whentohangup = 0, diff, rms = *ms;
+	time_t now;
+	struct ast_channel *winner = NULL;
+
+	for (i = 0; i < n; i++) {
+		ast_channel_lock(c[i]);
+		if (c[i]->masq && ast_do_masquerade(c[i])) {
+			ast_log(LOG_WARNING, "Masquerade failed\n");
+			*ms = -1;
+			ast_channel_unlock(c[i]);
+			return NULL;
+		}
+		if (c[i]->whentohangup) {
+			if (!whentohangup)
+				time(&now);
+			if ((diff = c[i]->whentohangup - now) < 1) {
+				c[i]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
+				ast_channel_unlock(c[i]);
+				return c[i];
+			}
+			if (!whentohangup || (diff < whentohangup))
+				whentohangup = diff;
+		}
+		ast_channel_unlock(c[i]);
+		CHECK_BLOCKING(c[i]);
+	}
+
+	rms = *ms;
+	if (whentohangup) {
+		rms = whentohangup * 1000;
+		if (*ms >= 0 && *ms < rms)
+			rms = *ms;
+	}
+
+	if (*ms > 0)
+		start = ast_tvnow();
+
+	res = epoll_wait(c[0]->epfd, ev, 25, rms);
+
+	for (i = 0; i < n; i++)
+		ast_clear_flag(c[i], AST_FLAG_BLOCKING);
+
+	if (res < 0) {
+		if (errno != EINTR)
+			*ms = -1;
+		return NULL;
+	}
+
+	if (whentohangup) {
+		time(&now);
+		for (i = 0; i < n; i++) {
+			if (c[i]->whentohangup && now >= c[i]->whentohangup) {
+				c[i]->_softhangup |= AST_SOFTHANGUP_TIMEOUT;
+				if (!winner)
+					winner = c[i];
+			}
+		}
+	}
+
+	if (!res) {
+		*ms = 0;
+		return winner;
+	}
+
+	for (i = 0; i < 25; i++) {
+		struct ast_epoll_data *aed = ev[i].data.ptr;
+
+		if (!ev[i].events || !aed)
+			continue;
+
+		winner = aed->chan;
+		if (ev[i].events & EPOLLPRI)
+			ast_set_flag(winner, AST_FLAG_EXCEPTION);
+		else
+			ast_clear_flag(winner, AST_FLAG_EXCEPTION);
+		winner->fdno = aed->which;
+	}
+
+	if (*ms > 0) {
+		*ms -= ast_tvdiff_ms(ast_tvnow(), start);
+		if (*ms < 0)
+			*ms = 0;
+	}
+
+	return winner;
 }
 
 struct ast_channel *ast_waitfor_nandfds(struct ast_channel **c, int n, int *fds, int nfds,
 					int *exception, int *outfd, int *ms)
 {
 	/* If no epoll file descriptor is available resort to classic nandfds */
-	if (!n || c[0]->epfd == -1)
+	if (!n || nfds || c[0]->epfd == -1)
 		return ast_waitfor_nandfds_classic(c, n, fds, nfds, exception, outfd, ms);
 	else if (!nfds && n == 1)
 		return ast_waitfor_nandfds_simple(c[0], ms);
-
-	return NULL;
+	else
+		return ast_waitfor_nandfds_complex(c, n, ms);
 }
 #endif
 
@@ -3817,6 +3996,8 @@
 	/* Check the need of a jitterbuffer for each channel */
 	jb_in_use = ast_jb_do_usecheck(c0, c1);
 
+	ast_poll_channel_add(c0, c1);
+
 	for (;;) {
 		struct ast_channel *who, *other;
 
@@ -3924,11 +4105,16 @@
 		/* XXX do we want to pass on also frames not matched above ? */
 		ast_frfree(f);
 
+#ifndef HAVE_EPOLL
 		/* Swap who gets priority */
 		cs[2] = cs[0];
 		cs[0] = cs[1];
 		cs[1] = cs[2];
-	}
+#endif
+	}
+
+	ast_poll_channel_del(c0, c1);
+
 	return res;
 }
 

Modified: team/file/scaling/main/rtp.c
URL: http://svn.digium.com/view/asterisk/team/file/scaling/main/rtp.c?view=diff&rev=58098&r1=58097&r2=58098
==============================================================================
--- team/file/scaling/main/rtp.c (original)
+++ team/file/scaling/main/rtp.c Tue Mar  6 15:46:04 2007
@@ -2894,6 +2894,8 @@
 	ast_channel_unlock(c0);
 	ast_channel_unlock(c1);
 
+	ast_poll_channel_add(c0, c1);
+
 	/* Throw our channels into the structure and enter the loop */
 	cs[0] = c0;
 	cs[1] = c1;
@@ -2911,6 +2913,7 @@
 			if (c1->tech_pvt == pvt1)
 				if (pr1->set_rtp_peer(c1, NULL, NULL, NULL, 0, 0))
 					ast_log(LOG_WARNING, "Channel '%s' failed to break RTP bridge\n", c1->name);
+			ast_poll_channel_del(c0, c1);
 			return AST_BRIDGE_RETRY;
 		}
 
@@ -2997,6 +3000,7 @@
 			if (c1->tech_pvt == pvt1)
 				if (pr1->set_rtp_peer(c1, NULL, NULL, NULL, 0, 0))
 					ast_log(LOG_WARNING, "Channel '%s' failed to break RTP bridge\n", c1->name);
+			ast_poll_channel_del(c0, c1);
 			return AST_BRIDGE_COMPLETE;
 		} else if ((fr->frametype == AST_FRAME_CONTROL) && !(flags & AST_BRIDGE_IGNORE_SIGS)) {
 			if ((fr->subclass == AST_CONTROL_HOLD) ||
@@ -3038,10 +3042,14 @@
 			ast_frfree(fr);
 		}
 		/* Swap priority */
+#ifndef HAVE_EPOLL
 		cs[2] = cs[0];
 		cs[0] = cs[1];
 		cs[1] = cs[2];
-	}
+#endif
+	}
+
+	ast_poll_channel_del(c0, c1);
 
 	return AST_BRIDGE_FAILED;
 }
@@ -3167,6 +3175,8 @@
 	/* Now let go of the channel locks and be on our way */
 	ast_channel_unlock(c0);
 	ast_channel_unlock(c1);
+
+	ast_poll_channel_add(c0, c1);
 
 	/* Go into a loop forwarding frames until we don't need to anymore */
 	cs[0] = c0;
@@ -3274,6 +3284,8 @@
 	p2p_set_bridge(p0, NULL);
 	p2p_set_bridge(p1, NULL);
 
+	ast_poll_channel_del(c0, c1);
+
 	return res;
 }
 



More information about the asterisk-commits mailing list