[Asterisk-cvs] asterisk/channels chan_iax2.c,1.131,1.132
markster at lists.digium.com
markster at lists.digium.com
Tue Apr 27 11:12:39 CDT 2004
Update of /usr/cvsroot/asterisk/channels
In directory mongoose.digium.com:/tmp/cvs-serv3314/channels
Modified Files:
chan_iax2.c
Log Message:
Extreme IAX2 trunking performance improvements
Index: chan_iax2.c
===================================================================
RCS file: /usr/cvsroot/asterisk/channels/chan_iax2.c,v
retrieving revision 1.131
retrieving revision 1.132
diff -u -d -r1.131 -r1.132
--- chan_iax2.c 26 Apr 2004 11:38:21 -0000 1.131
+++ chan_iax2.c 27 Apr 2004 15:18:55 -0000 1.132
@@ -230,10 +230,6 @@
int delme; /* I need to be deleted */
int temponly; /* I'm only a temp */
int trunk; /* Treat as an IAX trunking */
- struct timeval txtrunktime; /* Transmit trunktime */
- struct timeval rxtrunktime; /* Receive trunktime */
- struct timeval lasttxtime; /* Last transmitted trunktime */
- unsigned int lastsent; /* Last sent time */
/* Qualification */
int callno; /* Call number of POKE request */
@@ -246,6 +242,28 @@
int notransfer;
};
+#define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
+
+static struct iax2_trunk_peer {
+ ast_mutex_t lock;
+ struct sockaddr_in addr;
+ struct timeval txtrunktime; /* Transmit trunktime */
+ struct timeval rxtrunktime; /* Receive trunktime */
+ struct timeval lasttxtime; /* Last transmitted trunktime */
+ struct timeval trunkact; /* Last trunk activity */
+ unsigned int lastsent; /* Last sent time */
+ /* Trunk data and length */
+ unsigned char *trunkdata;
+ unsigned int trunkdatalen;
+ unsigned int trunkdataalloc;
+ struct iax2_trunk_peer *next;
+ int trunkerror;
+ int calls;
+ int firstcallno;
+} *tpeers = NULL;
+
+static ast_mutex_t tpeerlock = AST_MUTEX_INITIALIZER;
+
struct iax_firmware {
struct iax_firmware *next;
int fd;
@@ -290,7 +308,8 @@
#define MAX_RETRY_TIME 10000
#define MAX_JITTER_BUFFER 50
-#define MAX_TRUNKDATA 640 /* 40ms, uncompressed linear */
+#define DEFAULT_TRUNKDATA 640 * 10 /* 40ms, uncompressed linear * 10 channels */
+#define MAX_TRUNKDATA 640 * 200 /* 40ms, uncompressed linear * 200 channels */
/* If we have more than this much excess real jitter buffer, srhink it. */
static int max_jitter_buffer = MAX_JITTER_BUFFER;
@@ -426,10 +445,6 @@
int amaflags;
/* This is part of a trunk interface */
int trunk;
- /* Trunk data and length */
- unsigned char trunkdata[MAX_TRUNKDATA];
- unsigned int trunkdatalen;
- int trunkerror;
struct iax2_dpcache *dpentries;
int notransfer; /* do we want native bridging */
};
@@ -2530,39 +2545,38 @@
return tmp;
}
-static unsigned int calc_txpeerstamp(struct iax2_peer *peer, int sampms)
+static unsigned int calc_txpeerstamp(struct iax2_trunk_peer *tpeer, int sampms, struct timeval *tv)
{
- struct timeval tv;
long int mssincetx;
long int ms, pred;
- gettimeofday(&tv, NULL);
- mssincetx = (tv.tv_sec - peer->lasttxtime.tv_sec) * 1000 + (tv.tv_usec - peer->lasttxtime.tv_usec) / 1000;
+ tpeer->trunkact = *tv;
+ mssincetx = (tv->tv_sec - tpeer->lasttxtime.tv_sec) * 1000 + (tv->tv_usec - tpeer->lasttxtime.tv_usec) / 1000;
if (mssincetx > 5000) {
/* If it's been at least 5 seconds since the last time we transmitted on this trunk, reset our timers */
- peer->txtrunktime.tv_sec = tv.tv_sec;
- peer->txtrunktime.tv_usec = tv.tv_usec;
- peer->lastsent = 999999;
+ tpeer->txtrunktime.tv_sec = tv->tv_sec;
+ tpeer->txtrunktime.tv_usec = tv->tv_usec;
+ tpeer->lastsent = 999999;
}
/* Update last transmit time now */
- peer->lasttxtime.tv_sec = tv.tv_sec;
- peer->lasttxtime.tv_usec = tv.tv_usec;
+ tpeer->lasttxtime.tv_sec = tv->tv_sec;
+ tpeer->lasttxtime.tv_usec = tv->tv_usec;
/* Calculate ms offset */
- ms = (tv.tv_sec - peer->txtrunktime.tv_sec) * 1000 + (tv.tv_usec - peer->txtrunktime.tv_usec) / 1000;
+ ms = (tv->tv_sec - tpeer->txtrunktime.tv_sec) * 1000 + (tv->tv_usec - tpeer->txtrunktime.tv_usec) / 1000;
/* Predict from last value */
- pred = peer->lastsent + sampms;
+ pred = tpeer->lastsent + sampms;
if (abs(ms - pred) < 640)
ms = pred;
/* We never send the same timestamp twice, so fudge a little if we must */
- if (ms == peer->lastsent)
- ms = peer->lastsent + 1;
- peer->lastsent = ms;
+ if (ms == tpeer->lastsent)
+ ms = tpeer->lastsent + 1;
+ tpeer->lastsent = ms;
return ms;
}
-static unsigned int fix_peerts(struct iax2_peer *peer, int callno, unsigned int ts)
+static unsigned int fix_peerts(struct iax2_trunk_peer *peer, int callno, unsigned int ts)
{
long ms; /* NOT unsigned */
if (!iaxs[callno]->rxcore.tv_sec && !iaxs[callno]->rxcore.tv_usec) {
@@ -2677,6 +2691,85 @@
return ms;
}
+struct iax2_trunk_peer *find_tpeer(struct sockaddr_in *sin)
+{
+ struct iax2_trunk_peer *tpeer;
+ /* Finds and locks trunk peer */
+ ast_mutex_lock(&tpeerlock);
+ tpeer = tpeers;
+ while(tpeer) {
+ /* We don't lock here because tpeer->addr *never* changes */
+ if (!inaddrcmp(&tpeer->addr, sin)) {
+ ast_mutex_lock(&tpeer->lock);
+ break;
+ }
+ tpeer = tpeer->next;
+ }
+ if (!tpeer) {
+ tpeer = malloc(sizeof(struct iax2_trunk_peer));
+ if (tpeer) {
+ memset(tpeer, 0, sizeof(struct iax2_trunk_peer));
+ ast_mutex_init(&tpeer->lock);
+ tpeer->lastsent = 9999;
+ memcpy(&tpeer->addr, sin, sizeof(tpeer->addr));
+ gettimeofday(&tpeer->trunkact, NULL);
+ ast_mutex_lock(&tpeer->lock);
+ tpeer->next = tpeers;
+ tpeers = tpeer;
+ ast_log(LOG_DEBUG, "Created trunk peer for '%s:%d'\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+ }
+ }
+ ast_mutex_unlock(&tpeerlock);
+ return tpeer;
+}
+
+static int iax2_trunk_queue(struct chan_iax2_pvt *pvt, struct ast_frame *f)
+{
+ struct iax2_trunk_peer *tpeer;
+ void *tmp, *ptr;
+ struct ast_iax2_meta_trunk_entry *met;
+ tpeer = find_tpeer(&pvt->addr);
+ if (tpeer) {
+ if (tpeer->trunkdatalen + f->datalen + 4 >= tpeer->trunkdataalloc) {
+ /* Need to reallocate space */
+ if (tpeer->trunkdataalloc < MAX_TRUNKDATA) {
+ tmp = realloc(tpeer->trunkdata, tpeer->trunkdataalloc + DEFAULT_TRUNKDATA + IAX2_TRUNK_PREFACE);
+ if (tmp) {
+ tpeer->trunkdataalloc += DEFAULT_TRUNKDATA;
+ tpeer->trunkdata = tmp;
+ ast_log(LOG_DEBUG, "Expanded trunk '%s:%d' to %d bytes\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), tpeer->trunkdataalloc);
+ } else {
+ ast_log(LOG_WARNING, "Insufficient memory to expand trunk data to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+ ast_mutex_unlock(&tpeer->lock);
+ return -1;
+ }
+ } else {
+ ast_log(LOG_WARNING, "Maximum trunk data space exceeded to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+ ast_mutex_unlock(&tpeer->lock);
+ return -1;
+ }
+ }
+
+ /* Append to meta frame */
+ ptr = tpeer->trunkdata + IAX2_TRUNK_PREFACE + tpeer->trunkdatalen;
+ met = (struct ast_iax2_meta_trunk_entry *)ptr;
+ /* Store call number and length in meta header */
+ met->callno = htons(pvt->callno);
+ met->len = htons(f->datalen);
+ /* Advance pointers/decrease length past trunk entry header */
+ ptr += sizeof(struct ast_iax2_meta_trunk_entry);
+ tpeer->trunkdatalen += sizeof(struct ast_iax2_meta_trunk_entry);
+ /* Copy actual trunk data */
+ memcpy(ptr, f->data, f->datalen);
+ tpeer->trunkdatalen += f->datalen;
+ if (!tpeer->firstcallno)
+ tpeer->firstcallno = pvt->callno;
+ tpeer->calls++;
+ ast_mutex_unlock(&tpeer->lock);
+ }
+ return 0;
+}
+
static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned int ts, int seqno, int now, int transfer, int final)
{
/* Queue a packet for delivery on a given private structure. Use "ts" for
@@ -2794,17 +2887,7 @@
res = iax2_transmit(fr);
} else {
if (pvt->trunk) {
- /* Queue for transmission in a meta frame */
- if ((sizeof(pvt->trunkdata) - pvt->trunkdatalen) >= fr->af.datalen) {
- memcpy(pvt->trunkdata + pvt->trunkdatalen, fr->af.data, fr->af.datalen);
- pvt->trunkdatalen += fr->af.datalen;
- res = 0;
- pvt->trunkerror = 0;
- } else {
- if (!pvt->trunkerror)
- ast_log(LOG_WARNING, "Out of trunk data space on call number %d, dropping\n", pvt->callno);
- pvt->trunkerror = 1;
- }
+ iax2_trunk_queue(pvt, &fr->af);
res = 0;
} else if (fr->af.frametype == AST_FRAME_VIDEO) {
/* Video frame have no sequence number */
@@ -4192,96 +4275,70 @@
return 0;
}
-static int send_trunk(struct iax2_peer *peer)
+static int send_trunk(struct iax2_trunk_peer *tpeer, struct timeval *now)
{
- int x;
- int calls = 0;
int res = 0;
- int firstcall = 0;
- unsigned char buf[65536 + sizeof(struct iax_frame)], *ptr;
- int len = 65536;
struct iax_frame *fr;
struct ast_iax2_meta_hdr *meta;
struct ast_iax2_meta_trunk_hdr *mth;
- struct ast_iax2_meta_trunk_entry *met;
+ int calls = 0;
/* Point to frame */
- fr = (struct iax_frame *)buf;
+ fr = (struct iax_frame *)tpeer->trunkdata;
/* Point to meta data */
meta = (struct ast_iax2_meta_hdr *)fr->afdata;
mth = (struct ast_iax2_meta_trunk_hdr *)meta->data;
- /* Point past meta data for first meta trunk entry */
- ptr = fr->afdata + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
- len -= sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
-
- /* Search through trunked calls for a match with this peer */
- for (x=TRUNK_CALL_START;x<maxtrunkcall; x++) {
- ast_mutex_lock(&iaxsl[x]);
-#if 0
- if (iaxtrunkdebug)
- ast_verbose("Call %d is at %s:%d (%d)\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port), iaxs[x]->addr.sin_family);
-#endif
- if (iaxs[x] && iaxs[x]->trunk && iaxs[x]->trunkdatalen && !inaddrcmp(&iaxs[x]->addr, &peer->addr)) {
- if (iaxtrunkdebug)
- ast_verbose(" -- Sending call %d via trunk to %s:%d\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port));
- if (len >= iaxs[x]->trunkdatalen + sizeof(struct ast_iax2_meta_trunk_entry)) {
- met = (struct ast_iax2_meta_trunk_entry *)ptr;
- /* Store call number and length in meta header */
- met->callno = htons(x);
- met->len = htons(iaxs[x]->trunkdatalen);
- /* Advance pointers/decrease length past trunk entry header */
- ptr += sizeof(struct ast_iax2_meta_trunk_entry);
- len -= sizeof(struct ast_iax2_meta_trunk_entry);
- /* Copy actual trunk data */
- memcpy(ptr, iaxs[x]->trunkdata, iaxs[x]->trunkdatalen);
- /* Advance pointeres/decrease length for actual data */
- ptr += iaxs[x]->trunkdatalen;
- len -= iaxs[x]->trunkdatalen;
- } else
- ast_log(LOG_WARNING, "Out of space in frame for trunking call %d\n", x);
- iaxs[x]->trunkdatalen = 0;
- calls++;
- if (!firstcall)
- firstcall = x;
- }
- ast_mutex_unlock(&iaxsl[x]);
- }
- if (calls) {
+ if (tpeer->trunkdatalen) {
/* We're actually sending a frame, so fill the meta trunk header and meta header */
meta->zeros = 0;
meta->metacmd = IAX_META_TRUNK;
meta->cmddata = 0;
- mth->ts = htonl(calc_txpeerstamp(peer, trunkfreq));
+ mth->ts = htonl(calc_txpeerstamp(tpeer, trunkfreq, now));
/* And the rest of the ast_iax2 header */
fr->direction = DIRECTION_OUTGRESS;
fr->retrans = -1;
fr->transfer = 0;
/* Any appropriate call will do */
- fr->callno = firstcall;
+ fr->callno = tpeer->firstcallno;
fr->data = fr->afdata;
- fr->datalen = 65536 - len;
+ fr->datalen = tpeer->trunkdatalen + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
#if 0
ast_log(LOG_DEBUG, "Trunking %d calls in %d bytes, ts=%d\n", calls, fr->datalen, ntohl(mth->ts));
#endif
res = send_packet(fr);
+ calls = tpeer->calls;
+ /* Reset transmit trunk side data */
+ tpeer->trunkdatalen = 0;
+ tpeer->calls = 0;
+ tpeer->firstcallno = 0;
}
if (res < 0)
return res;
return calls;
}
+static inline int iax2_trunk_expired(struct iax2_trunk_peer *tpeer, struct timeval *now)
+{
+ /* Drop when trunk is about 5 seconds idle */
+ if (now->tv_sec > tpeer->trunkact.tv_sec + 5)
+ return 1;
+ return 0;
+}
+
static int timing_read(int *id, int fd, short events, void *cbdata)
{
char buf[1024];
int res;
- struct iax2_peer *peer;
+ struct iax2_trunk_peer *tpeer, *prev = NULL, *drop=NULL;
int processed = 0;
int totalcalls = 0;
#ifdef ZT_TIMERACK
int x = 1;
#endif
+ struct timeval now;
if (iaxtrunkdebug)
ast_verbose("Beginning trunk processing\n");
+ gettimeofday(&now, NULL);
if (events & AST_IO_PRI) {
#ifdef ZT_TIMERACK
/* Great, this is a timing interface, just call the ioctl */
@@ -4299,20 +4356,43 @@
}
}
/* For each peer that supports trunking... */
- ast_mutex_lock(&peerl.lock);
- peer = peerl.peers;
- while(peer) {
- if (peer->trunk) {
- processed++;
- res = send_trunk(peer);
+ ast_mutex_lock(&tpeerlock);
+ tpeer = tpeers;
+ while(tpeer) {
+ processed++;
+ ast_mutex_lock(&tpeer->lock);
+ /* We can drop a single tpeer per pass. That makes all this logic
+ substantially easier */
+ if (!drop && iax2_trunk_expired(tpeer, &now)) {
+ /* Take it out of the list, but don't free it yet, because it
+ could be in use */
+ if (prev)
+ prev->next = tpeer->next;
+ else
+ tpeers = tpeer->next;
+ drop = tpeer;
+ } else {
+ res = send_trunk(tpeer, &now);
if (iaxtrunkdebug)
- ast_verbose("Processed trunk peer '%s' (%s:%d) with %d call(s)\n", peer->name, inet_ntoa(peer->addr.sin_addr), ntohs(peer->addr.sin_port), res);
- totalcalls += res;
- res = 0;
- }
- peer = peer->next;
+ ast_verbose("Processed trunk peer (%s:%d) with %d call(s)\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), res);
+ }
+ totalcalls += res;
+ res = 0;
+ ast_mutex_unlock(&tpeer->lock);
+ prev = tpeer;
+ tpeer = tpeer->next;
+ }
+ ast_mutex_unlock(&tpeerlock);
+ if (drop) {
+ ast_mutex_lock(&drop->lock);
+ /* Once we have this lock, we're sure nobody else is using it or could use it once we release it,
+ because by the time they could get tpeerlock, we've already grabbed it */
+ ast_log(LOG_DEBUG, "Dropping unused iax2 trunk peer '%s:%d'\n", inet_ntoa(drop->addr.sin_addr), ntohs(drop->addr.sin_port));
+ free(drop->trunkdata);
+ ast_mutex_unlock(&drop->lock);
+ free(drop);
+
}
- ast_mutex_unlock(&peerl.lock);
if (iaxtrunkdebug)
ast_verbose("Ending trunk processing with %d peers and %d calls processed\n", processed, totalcalls);
iaxtrunkdebug =0;
@@ -4466,6 +4546,7 @@
return -1;
}
+
static int socket_read(int *id, int fd, short events, void *cbdata)
{
struct sockaddr_in sin;
@@ -4488,6 +4569,7 @@
struct ast_channel *c;
struct iax2_dpcache *dp;
struct iax2_peer *peer;
+ struct iax2_trunk_peer *tpeer;
struct iax_ies ies;
struct iax_ie_data ied0, ied1;
int format;
@@ -4526,21 +4608,16 @@
ts = ntohl(mth->ts);
res -= (sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr));
ptr = mth->data;
- ast_mutex_lock(&peerl.lock);
- peer = peerl.peers;
- while(peer) {
- if (!inaddrcmp(&peer->addr, &sin))
- break;
- peer = peer->next;
- }
- ast_mutex_unlock(&peerl.lock);
- if (!peer) {
+ tpeer = find_tpeer(&sin);
+ if (!tpeer) {
ast_log(LOG_WARNING, "Unable to accept trunked packet from '%s:%d': No matching peer\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
return 1;
}
- if (!ts || (!peer->rxtrunktime.tv_sec && !peer->rxtrunktime.tv_usec)) {
- gettimeofday(&peer->rxtrunktime, NULL);
- }
+ if (!ts || (!tpeer->rxtrunktime.tv_sec && !tpeer->rxtrunktime.tv_usec)) {
+ gettimeofday(&tpeer->rxtrunktime, NULL);
+ tpeer->trunkact = tpeer->rxtrunktime;
+ } else
+ gettimeofday(&tpeer->trunkact, NULL);
while(res >= sizeof(struct ast_iax2_meta_trunk_entry)) {
/* Process channels */
mte = (struct ast_iax2_meta_trunk_entry *)ptr;
@@ -4566,7 +4643,7 @@
f.data = ptr;
else
f.data = NULL;
- fr.ts = fix_peerts(peer, fr.callno, ts);
+ fr.ts = fix_peerts(tpeer, fr.callno, ts);
/* Don't pass any packets until we're started */
if ((iaxs[fr.callno]->state & IAX_STATE_STARTED)) {
/* Common things */
@@ -4602,6 +4679,7 @@
ptr += len;
res -= len;
}
+ ast_mutex_unlock(&tpeer->lock);
}
return 1;
@@ -5746,7 +5824,6 @@
memset(peer, 0, sizeof(struct iax2_peer));
peer->expire = -1;
peer->pokeexpire = -1;
- peer->lastsent = 999999;
}
if (peer) {
if (!found) {
More information about the svn-commits
mailing list