[Asterisk-cvs] asterisk/channels chan_iax2.c,1.131,1.132

markster at lists.digium.com markster at lists.digium.com
Tue Apr 27 11:12:39 CDT 2004


Update of /usr/cvsroot/asterisk/channels
In directory mongoose.digium.com:/tmp/cvs-serv3314/channels

Modified Files:
	chan_iax2.c 
Log Message:
Extreme IAX2 trunking performance improvements


Index: chan_iax2.c
===================================================================
RCS file: /usr/cvsroot/asterisk/channels/chan_iax2.c,v
retrieving revision 1.131
retrieving revision 1.132
diff -u -d -r1.131 -r1.132
--- chan_iax2.c	26 Apr 2004 11:38:21 -0000	1.131
+++ chan_iax2.c	27 Apr 2004 15:18:55 -0000	1.132
@@ -230,10 +230,6 @@
 	int delme;						/* I need to be deleted */
 	int temponly;					/* I'm only a temp */
 	int trunk;						/* Treat as an IAX trunking */
-	struct timeval txtrunktime;		/* Transmit trunktime */
-	struct timeval rxtrunktime;		/* Receive trunktime */
-	struct timeval lasttxtime;		/* Last transmitted trunktime */
-	unsigned int lastsent;			/* Last sent time */
 
 	/* Qualification */
 	int callno;					/* Call number of POKE request */
@@ -246,6 +242,28 @@
 	int notransfer;
 };
 
+#define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
+
+static struct iax2_trunk_peer {
+	ast_mutex_t lock;
+	struct sockaddr_in addr;
+	struct timeval txtrunktime;		/* Transmit trunktime */
+	struct timeval rxtrunktime;		/* Receive trunktime */
+	struct timeval lasttxtime;		/* Last transmitted trunktime */
+	struct timeval trunkact;		/* Last trunk activity */
+	unsigned int lastsent;			/* Last sent time */
+	/* Trunk data and length */
+	unsigned char *trunkdata;
+	unsigned int trunkdatalen;
+	unsigned int trunkdataalloc;
+	struct iax2_trunk_peer *next;
+	int trunkerror;
+	int calls;
+	int firstcallno;
+} *tpeers = NULL;
+
+static ast_mutex_t tpeerlock = AST_MUTEX_INITIALIZER;
+
 struct iax_firmware {
 	struct iax_firmware *next;
 	int fd;
@@ -290,7 +308,8 @@
 #define MAX_RETRY_TIME  10000
 #define MAX_JITTER_BUFFER 50
 
-#define MAX_TRUNKDATA	640		/* 40ms, uncompressed linear */
+#define DEFAULT_TRUNKDATA	640 * 10		/* 40ms, uncompressed linear * 10 channels */
+#define MAX_TRUNKDATA		640 * 200		/* 40ms, uncompressed linear * 200 channels */
 
 /* If we have more than this much excess real jitter buffer, srhink it. */
 static int max_jitter_buffer = MAX_JITTER_BUFFER;
@@ -426,10 +445,6 @@
 	int amaflags;
 	/* This is part of a trunk interface */
 	int trunk;
-	/* Trunk data and length */
-	unsigned char trunkdata[MAX_TRUNKDATA];
-	unsigned int trunkdatalen;
-	int trunkerror;
 	struct iax2_dpcache *dpentries;
 	int notransfer;		/* do we want native bridging */
 };
@@ -2530,39 +2545,38 @@
 	return tmp;
 }
 
-static unsigned int calc_txpeerstamp(struct iax2_peer *peer, int sampms)
+static unsigned int calc_txpeerstamp(struct iax2_trunk_peer *tpeer, int sampms, struct timeval *tv)
 {
-	struct timeval tv;
 	long int mssincetx;
 	long int ms, pred;
 
-	gettimeofday(&tv, NULL);
-	mssincetx = (tv.tv_sec - peer->lasttxtime.tv_sec) * 1000 + (tv.tv_usec - peer->lasttxtime.tv_usec) / 1000;
+	tpeer->trunkact = *tv;
+	mssincetx = (tv->tv_sec - tpeer->lasttxtime.tv_sec) * 1000 + (tv->tv_usec - tpeer->lasttxtime.tv_usec) / 1000;
 	if (mssincetx > 5000) {
 		/* If it's been at least 5 seconds since the last time we transmitted on this trunk, reset our timers */
-		peer->txtrunktime.tv_sec = tv.tv_sec;
-		peer->txtrunktime.tv_usec = tv.tv_usec;
-		peer->lastsent = 999999;
+		tpeer->txtrunktime.tv_sec = tv->tv_sec;
+		tpeer->txtrunktime.tv_usec = tv->tv_usec;
+		tpeer->lastsent = 999999;
 	}
 	/* Update last transmit time now */
-	peer->lasttxtime.tv_sec = tv.tv_sec;
-	peer->lasttxtime.tv_usec = tv.tv_usec;
+	tpeer->lasttxtime.tv_sec = tv->tv_sec;
+	tpeer->lasttxtime.tv_usec = tv->tv_usec;
 	
 	/* Calculate ms offset */
-	ms = (tv.tv_sec - peer->txtrunktime.tv_sec) * 1000 + (tv.tv_usec - peer->txtrunktime.tv_usec) / 1000;
+	ms = (tv->tv_sec - tpeer->txtrunktime.tv_sec) * 1000 + (tv->tv_usec - tpeer->txtrunktime.tv_usec) / 1000;
 	/* Predict from last value */
-	pred = peer->lastsent + sampms;
+	pred = tpeer->lastsent + sampms;
 	if (abs(ms - pred) < 640)
 		ms = pred;
 	
 	/* We never send the same timestamp twice, so fudge a little if we must */
-	if (ms == peer->lastsent)
-		ms = peer->lastsent + 1;
-	peer->lastsent = ms;
+	if (ms == tpeer->lastsent)
+		ms = tpeer->lastsent + 1;
+	tpeer->lastsent = ms;
 	return ms;
 }
 
-static unsigned int fix_peerts(struct iax2_peer *peer, int callno, unsigned int ts)
+static unsigned int fix_peerts(struct iax2_trunk_peer *peer, int callno, unsigned int ts)
 {
 	long ms;	/* NOT unsigned */
 	if (!iaxs[callno]->rxcore.tv_sec && !iaxs[callno]->rxcore.tv_usec) {
@@ -2677,6 +2691,85 @@
 	return ms;
 }
 
+struct iax2_trunk_peer *find_tpeer(struct sockaddr_in *sin)
+{
+	struct iax2_trunk_peer *tpeer;
+	/* Finds and locks trunk peer */
+	ast_mutex_lock(&tpeerlock);
+	tpeer = tpeers;
+	while(tpeer) {
+		/* We don't lock here because tpeer->addr *never* changes */
+		if (!inaddrcmp(&tpeer->addr, sin)) {
+			ast_mutex_lock(&tpeer->lock);
+			break;
+		}
+		tpeer = tpeer->next;
+	}
+	if (!tpeer) {
+		tpeer = malloc(sizeof(struct iax2_trunk_peer));
+		if (tpeer) {
+			memset(tpeer, 0, sizeof(struct iax2_trunk_peer));
+			ast_mutex_init(&tpeer->lock);
+			tpeer->lastsent = 9999;
+			memcpy(&tpeer->addr, sin, sizeof(tpeer->addr));
+			gettimeofday(&tpeer->trunkact, NULL);
+			ast_mutex_lock(&tpeer->lock);
+			tpeer->next = tpeers;
+			tpeers = tpeer;
+			ast_log(LOG_DEBUG, "Created trunk peer for '%s:%d'\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+		}
+	}
+	ast_mutex_unlock(&tpeerlock);
+	return tpeer;
+}
+
+static int iax2_trunk_queue(struct chan_iax2_pvt *pvt, struct ast_frame *f)
+{
+	struct iax2_trunk_peer *tpeer;
+	void *tmp, *ptr;
+	struct ast_iax2_meta_trunk_entry *met;
+	tpeer = find_tpeer(&pvt->addr);
+	if (tpeer) {
+		if (tpeer->trunkdatalen + f->datalen + 4 >= tpeer->trunkdataalloc) {
+			/* Need to reallocate space */
+			if (tpeer->trunkdataalloc < MAX_TRUNKDATA) {
+				tmp = realloc(tpeer->trunkdata, tpeer->trunkdataalloc + DEFAULT_TRUNKDATA + IAX2_TRUNK_PREFACE);
+				if (tmp) {
+					tpeer->trunkdataalloc += DEFAULT_TRUNKDATA;
+					tpeer->trunkdata = tmp;
+					ast_log(LOG_DEBUG, "Expanded trunk '%s:%d' to %d bytes\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), tpeer->trunkdataalloc);
+				} else {
+					ast_log(LOG_WARNING, "Insufficient memory to expand trunk data to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+					ast_mutex_unlock(&tpeer->lock);
+					return -1;
+				}
+			} else {
+				ast_log(LOG_WARNING, "Maximum trunk data space exceeded to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+				ast_mutex_unlock(&tpeer->lock);
+				return -1;
+			}
+		}
+		
+		/* Append to meta frame */
+		ptr = tpeer->trunkdata + IAX2_TRUNK_PREFACE + tpeer->trunkdatalen;
+		met = (struct ast_iax2_meta_trunk_entry *)ptr;
+		/* Store call number and length in meta header */
+		met->callno = htons(pvt->callno);
+		met->len = htons(f->datalen);
+		/* Advance pointers/decrease length past trunk entry header */
+		ptr += sizeof(struct ast_iax2_meta_trunk_entry);
+		tpeer->trunkdatalen += sizeof(struct ast_iax2_meta_trunk_entry);
+		/* Copy actual trunk data */
+		memcpy(ptr, f->data, f->datalen);
+		tpeer->trunkdatalen += f->datalen;
+		if (!tpeer->firstcallno)
+			tpeer->firstcallno = pvt->callno;
+		tpeer->calls++;
+		ast_mutex_unlock(&tpeer->lock);
+	}
+	return 0;
+}
+
 static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned int ts, int seqno, int now, int transfer, int final)
 {
 	/* Queue a packet for delivery on a given private structure.  Use "ts" for
@@ -2794,17 +2887,7 @@
 			res = iax2_transmit(fr);
 	} else {
 		if (pvt->trunk) {
-			/* Queue for transmission in a meta frame */
-			if ((sizeof(pvt->trunkdata) - pvt->trunkdatalen) >= fr->af.datalen) {
-				memcpy(pvt->trunkdata + pvt->trunkdatalen, fr->af.data, fr->af.datalen);
-				pvt->trunkdatalen += fr->af.datalen;
-				res = 0;
-				pvt->trunkerror = 0;
-			} else {
-				if (!pvt->trunkerror)
-					ast_log(LOG_WARNING, "Out of trunk data space on call number %d, dropping\n", pvt->callno);
-				pvt->trunkerror = 1;
-			}
+			iax2_trunk_queue(pvt, &fr->af);
 			res = 0;
 		} else if (fr->af.frametype == AST_FRAME_VIDEO) {
 			/* Video frame have no sequence number */
@@ -4192,96 +4275,70 @@
 	return 0;
 }
 
-static int send_trunk(struct iax2_peer *peer)
+static int send_trunk(struct iax2_trunk_peer *tpeer, struct timeval *now)
 {
-	int x;
-	int calls = 0;
 	int res = 0;
-	int firstcall = 0;
-	unsigned char buf[65536 + sizeof(struct iax_frame)], *ptr;
-	int len = 65536;
 	struct iax_frame *fr;
 	struct ast_iax2_meta_hdr *meta;
 	struct ast_iax2_meta_trunk_hdr *mth;
-	struct ast_iax2_meta_trunk_entry *met;
+	int calls = 0;
 	
 	/* Point to frame */
-	fr = (struct iax_frame *)buf;
+	fr = (struct iax_frame *)tpeer->trunkdata;
 	/* Point to meta data */
 	meta = (struct ast_iax2_meta_hdr *)fr->afdata;
 	mth = (struct ast_iax2_meta_trunk_hdr *)meta->data;
-	/* Point past meta data for first meta trunk entry */
-	ptr = fr->afdata + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
-	len -= sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
-	
-	/* Search through trunked calls for a match with this peer */
-	for (x=TRUNK_CALL_START;x<maxtrunkcall; x++) {
-		ast_mutex_lock(&iaxsl[x]);
-#if 0
-		if (iaxtrunkdebug)
-			ast_verbose("Call %d is at %s:%d (%d)\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port), iaxs[x]->addr.sin_family);
-#endif
-		if (iaxs[x] && iaxs[x]->trunk && iaxs[x]->trunkdatalen && !inaddrcmp(&iaxs[x]->addr, &peer->addr)) {
-			if (iaxtrunkdebug)
-				ast_verbose(" -- Sending call %d via trunk to %s:%d\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port));
-			if (len >= iaxs[x]->trunkdatalen + sizeof(struct ast_iax2_meta_trunk_entry)) {
-				met = (struct ast_iax2_meta_trunk_entry *)ptr;
-				/* Store call number and length in meta header */
-				met->callno = htons(x);
-				met->len = htons(iaxs[x]->trunkdatalen);
-				/* Advance pointers/decrease length past trunk entry header */
-				ptr += sizeof(struct ast_iax2_meta_trunk_entry);
-				len -= sizeof(struct ast_iax2_meta_trunk_entry);
-				/* Copy actual trunk data */
-				memcpy(ptr, iaxs[x]->trunkdata, iaxs[x]->trunkdatalen);
-				/* Advance pointeres/decrease length for actual data */
-				ptr += iaxs[x]->trunkdatalen;
-				len -= iaxs[x]->trunkdatalen;
-			} else 
-				ast_log(LOG_WARNING, "Out of space in frame for trunking call %d\n", x);
-			iaxs[x]->trunkdatalen = 0;
-			calls++;
-			if (!firstcall)
-				firstcall = x;
-		}
-		ast_mutex_unlock(&iaxsl[x]);
-	}
-	if (calls) {
+	if (tpeer->trunkdatalen) {
 		/* We're actually sending a frame, so fill the meta trunk header and meta header */
 		meta->zeros = 0;
 		meta->metacmd = IAX_META_TRUNK;
 		meta->cmddata = 0;
-		mth->ts = htonl(calc_txpeerstamp(peer, trunkfreq));
+		mth->ts = htonl(calc_txpeerstamp(tpeer, trunkfreq, now));
 		/* And the rest of the ast_iax2 header */
 		fr->direction = DIRECTION_OUTGRESS;
 		fr->retrans = -1;
 		fr->transfer = 0;
 		/* Any appropriate call will do */
-		fr->callno = firstcall;
+		fr->callno = tpeer->firstcallno;
 		fr->data = fr->afdata;
-		fr->datalen = 65536 - len;
+		fr->datalen = tpeer->trunkdatalen + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
 #if 0
 		ast_log(LOG_DEBUG, "Trunking %d calls in %d bytes, ts=%d\n", calls, fr->datalen, ntohl(mth->ts));
 #endif		
 		res = send_packet(fr);
+		calls = tpeer->calls;
+		/* Reset transmit trunk side data */
+		tpeer->trunkdatalen = 0;
+		tpeer->calls = 0;
+		tpeer->firstcallno = 0;
 	}
 	if (res < 0)
 		return res;
 	return calls;
 }
 
+static inline int iax2_trunk_expired(struct iax2_trunk_peer *tpeer, struct timeval *now)
+{
+	/* Drop when trunk is about 5 seconds idle */
+	if (now->tv_sec > tpeer->trunkact.tv_sec + 5) 
+		return 1;
+	return 0;
+}
+
 static int timing_read(int *id, int fd, short events, void *cbdata)
 {
 	char buf[1024];
 	int res;
-	struct iax2_peer *peer;
+	struct iax2_trunk_peer *tpeer, *prev = NULL, *drop=NULL;
 	int processed = 0;
 	int totalcalls = 0;
 #ifdef ZT_TIMERACK
 	int x = 1;
 #endif
+	struct timeval now;
 	if (iaxtrunkdebug)
 		ast_verbose("Beginning trunk processing\n");
+	gettimeofday(&now, NULL);
 	if (events & AST_IO_PRI) {
 #ifdef ZT_TIMERACK
 		/* Great, this is a timing interface, just call the ioctl */
@@ -4299,20 +4356,43 @@
 		}
 	}
 	/* For each peer that supports trunking... */
-	ast_mutex_lock(&peerl.lock);
-	peer = peerl.peers;
-	while(peer) {
-		if (peer->trunk) {
-			processed++;
-			res = send_trunk(peer);
+	ast_mutex_lock(&tpeerlock);
+	tpeer = tpeers;
+	while(tpeer) {
+		processed++;
+		ast_mutex_lock(&tpeer->lock);
+		/* We can drop a single tpeer per pass.  That makes all this logic
+		   substantially easier */
+		if (!drop && iax2_trunk_expired(tpeer, &now)) {
+			/* Take it out of the list, but don't free it yet, because it
+			   could be in use */
+			if (prev)
+				prev->next = tpeer->next;
+			else
+				tpeers = tpeer->next;
+			drop = tpeer;
+		} else {
+			res = send_trunk(tpeer, &now);
 			if (iaxtrunkdebug)
-				ast_verbose("Processed trunk peer '%s' (%s:%d) with %d call(s)\n", peer->name, inet_ntoa(peer->addr.sin_addr), ntohs(peer->addr.sin_port), res);
-			totalcalls += res;	
-			res = 0;
-		}
-		peer = peer->next;
+				ast_verbose("Processed trunk peer (%s:%d) with %d call(s)\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), res);
+		}		
+		totalcalls += res;	
+		res = 0;
+		ast_mutex_unlock(&tpeer->lock);
+		prev = tpeer;
+		tpeer = tpeer->next;
+	}
+	ast_mutex_unlock(&tpeerlock);
+	if (drop) {
+		ast_mutex_lock(&drop->lock);
+		/* Once we have this lock, we're sure nobody else is using it or could use it once we release it, 
+		   because by the time they could get tpeerlock, we've already grabbed it */
+		ast_log(LOG_DEBUG, "Dropping unused iax2 trunk peer '%s:%d'\n", inet_ntoa(drop->addr.sin_addr), ntohs(drop->addr.sin_port));
+		free(drop->trunkdata);
+		ast_mutex_unlock(&drop->lock);
+		free(drop);
+		
 	}
-	ast_mutex_unlock(&peerl.lock);
 	if (iaxtrunkdebug)
 		ast_verbose("Ending trunk processing with %d peers and %d calls processed\n", processed, totalcalls);
 	iaxtrunkdebug =0;
@@ -4466,6 +4546,7 @@
 	return -1;
 }
 
+
 static int socket_read(int *id, int fd, short events, void *cbdata)
 {
 	struct sockaddr_in sin;
@@ -4488,6 +4569,7 @@
 	struct ast_channel *c;
 	struct iax2_dpcache *dp;
 	struct iax2_peer *peer;
+	struct iax2_trunk_peer *tpeer;
 	struct iax_ies ies;
 	struct iax_ie_data ied0, ied1;
 	int format;
@@ -4526,21 +4608,16 @@
 			ts = ntohl(mth->ts);
 			res -= (sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr));
 			ptr = mth->data;
-			ast_mutex_lock(&peerl.lock);
-			peer = peerl.peers;
-			while(peer) {
-				if (!inaddrcmp(&peer->addr, &sin))
-					break;
-				peer = peer->next;
-			}
-			ast_mutex_unlock(&peerl.lock);
-			if (!peer) {
+			tpeer = find_tpeer(&sin);
+			if (!tpeer) {
 				ast_log(LOG_WARNING, "Unable to accept trunked packet from '%s:%d': No matching peer\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
 				return 1;
 			}
-			if (!ts || (!peer->rxtrunktime.tv_sec && !peer->rxtrunktime.tv_usec)) {
-				gettimeofday(&peer->rxtrunktime, NULL);
-			}
+			if (!ts || (!tpeer->rxtrunktime.tv_sec && !tpeer->rxtrunktime.tv_usec)) {
+				gettimeofday(&tpeer->rxtrunktime, NULL);
+				tpeer->trunkact = tpeer->rxtrunktime;
+			} else
+				gettimeofday(&tpeer->trunkact, NULL);
 			while(res >= sizeof(struct ast_iax2_meta_trunk_entry)) {
 				/* Process channels */
 				mte = (struct ast_iax2_meta_trunk_entry *)ptr;
@@ -4566,7 +4643,7 @@
 									f.data = ptr;
 								else
 									f.data = NULL;
-								fr.ts = fix_peerts(peer, fr.callno, ts);
+								fr.ts = fix_peerts(tpeer, fr.callno, ts);
 								/* Don't pass any packets until we're started */
 								if ((iaxs[fr.callno]->state & IAX_STATE_STARTED)) {
 									/* Common things */
@@ -4602,6 +4679,7 @@
 				ptr += len;
 				res -= len;
 			}
+			ast_mutex_unlock(&tpeer->lock);
 			
 		}
 		return 1;
@@ -5746,7 +5824,6 @@
 		memset(peer, 0, sizeof(struct iax2_peer));
 		peer->expire = -1;
 		peer->pokeexpire = -1;
-		peer->lastsent = 999999;
 	}
 	if (peer) {
 		if (!found) {




More information about the svn-commits mailing list