[asterisk-commits] russell: branch russell/iax2_frame_queue r77804 - /team/russell/iax2_frame_qu...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Jul 30 16:41:52 CDT 2007
Author: russell
Date: Mon Jul 30 16:41:52 2007
New Revision: 77804
URL: http://svn.digium.com/view/asterisk?view=rev&rev=77804
Log:
This change converts the single global frame queue to be a queue per call number.
In the previous version of this module, using a single queue wasn't really a big
problem other than taking a little bit longer to traverse the queue.
Now that chan_iax2 does its processing in a much more multithreaded and
asynchronous manner, this single shared resource turns into a bottleneck for
frame processing.
This removes this global shared resources but others still exist. (See my rant
above the find_callno() function for the worst one I have found).
Modified:
team/russell/iax2_frame_queue/channels/chan_iax2.c
Modified: team/russell/iax2_frame_queue/channels/chan_iax2.c
URL: http://svn.digium.com/view/asterisk/team/russell/iax2_frame_queue/channels/chan_iax2.c?view=diff&rev=77804&r1=77803&r2=77804
==============================================================================
--- team/russell/iax2_frame_queue/channels/chan_iax2.c (original)
+++ team/russell/iax2_frame_queue/channels/chan_iax2.c Mon Jul 30 16:41:52 2007
@@ -657,7 +657,7 @@
* on module unload. This is because all active calls are destroyed, and
* all frames in this queue will get destroyed as a part of that process.
*/
-static AST_LIST_HEAD_STATIC(frame_queue, iax_frame);
+static AST_LIST_HEAD(, iax_frame) frame_queue[IAX_MAX_CALLS];
static AST_LIST_HEAD_STATIC(users, iax2_user);
@@ -1956,13 +1956,12 @@
ast_queue_hangup(owner);
}
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+ AST_LIST_LOCK(&frame_queue[pvt->callno]);
+ AST_LIST_TRAVERSE(&frame_queue[pvt->callno], cur, list) {
/* Cancel any pending transmissions */
- if (cur->callno == pvt->callno)
- cur->retries = -1;
- }
- AST_LIST_UNLOCK(&frame_queue);
+ cur->retries = -1;
+ }
+ AST_LIST_UNLOCK(&frame_queue[pvt->callno]);
if (pvt->reg)
pvt->reg->callno = 0;
@@ -2072,9 +2071,12 @@
/* Do not try again */
if (freeme) {
/* Don't attempt delivery, just remove it from the queue */
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_REMOVE(&frame_queue, f, list);
- AST_LIST_UNLOCK(&frame_queue);
+ if (callno) {
+ /* XXX Note that there should never be a frame without a callno ... */
+ AST_LIST_LOCK(&frame_queue[callno]);
+ AST_LIST_REMOVE(&frame_queue[callno], f, list);
+ AST_LIST_UNLOCK(&frame_queue[callno]);
+ }
f->retrans = -1;
/* Free the IAX frame */
iax2_frame_free(f);
@@ -2268,20 +2270,22 @@
static int iax2_show_stats(int fd, int argc, char *argv[])
{
struct iax_frame *cur;
- int cnt = 0, dead=0, final=0;
+ int cnt = 0, dead = 0, final = 0, i;
if (argc != 3)
return RESULT_SHOWUSAGE;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
- if (cur->retries < 0)
- dead++;
- if (cur->final)
- final++;
- cnt++;
- }
- AST_LIST_UNLOCK(&frame_queue);
+ for (i = 0; i < IAX_MAX_CALLS; i++) {
+ AST_LIST_LOCK(&frame_queue[i]);
+ AST_LIST_TRAVERSE(&frame_queue[i], cur, list) {
+ if (cur->retries < 0)
+ dead++;
+ if (cur->final)
+ final++;
+ cnt++;
+ }
+ AST_LIST_UNLOCK(&frame_queue[i]);
+ }
ast_cli(fd, " IAX Statistics\n");
ast_cli(fd, "---------------------\n");
@@ -2612,9 +2616,9 @@
/* By setting this to 0, the network thread will send it for us, and
queue retransmission if necessary */
fr->sentyet = 0;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_INSERT_TAIL(&frame_queue, fr, list);
- AST_LIST_UNLOCK(&frame_queue);
+ AST_LIST_LOCK(&frame_queue[fr->callno]);
+ AST_LIST_INSERT_TAIL(&frame_queue[fr->callno], fr, list);
+ AST_LIST_UNLOCK(&frame_queue[fr->callno]);
/* Wake up the network and scheduler thread */
if (netthreadid != AST_PTHREADT_NULL)
pthread_kill(netthreadid, SIGURG);
@@ -5710,15 +5714,14 @@
pvt->lastsent = 0;
pvt->nextpred = 0;
pvt->pingtime = DEFAULT_RETRY_TIME;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+ AST_LIST_LOCK(&frame_queue[callno]);
+ AST_LIST_TRAVERSE(&frame_queue[callno], cur, list) {
/* We must cancel any packets that would have been transmitted
because now we're talking to someone new. It's okay, they
were transmitted to someone that didn't care anyway. */
- if (callno == cur->callno)
- cur->retries = -1;
- }
- AST_LIST_UNLOCK(&frame_queue);
+ cur->retries = -1;
+ }
+ AST_LIST_UNLOCK(&frame_queue[callno]);
return 0;
}
@@ -6258,15 +6261,13 @@
{
struct iax_frame *f;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, f, list) {
+ AST_LIST_LOCK(&frame_queue[callno]);
+ AST_LIST_TRAVERSE(&frame_queue[callno], f, list) {
/* Send a copy immediately */
- if ((f->callno == callno) && iaxs[f->callno] &&
- ((unsigned char ) (f->oseqno - last) < 128)) {
+ if (iaxs[callno] && ((unsigned char ) (f->oseqno - last) < 128))
send_packet(f);
- }
- }
- AST_LIST_UNLOCK(&frame_queue);
+ }
+ AST_LIST_UNLOCK(&frame_queue[callno]);
}
static void __iax2_poke_peer_s(void *data)
@@ -7200,17 +7201,17 @@
if (iaxdebug)
ast_debug(1, "Cancelling transmission of packet %d\n", x);
call_to_destroy = 0;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+ AST_LIST_LOCK(&frame_queue[fr->callno]);
+ AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
/* If it's our call, and our timestamp, mark -1 retries */
- if ((fr->callno == cur->callno) && (x == cur->oseqno)) {
+ if (x == cur->oseqno) {
cur->retries = -1;
/* Destroy call if this is the end */
if (cur->final)
call_to_destroy = fr->callno;
}
}
- AST_LIST_UNLOCK(&frame_queue);
+ AST_LIST_UNLOCK(&frame_queue[fr->callno]);
if (call_to_destroy) {
if (iaxdebug)
ast_debug(1, "Really destroying %d, having been acked on final message\n", call_to_destroy);
@@ -7409,13 +7410,13 @@
case IAX_COMMAND_TXACC:
if (iaxs[fr->callno]->transferring == TRANSFER_BEGIN) {
/* Ack the packet with the given timestamp */
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+ AST_LIST_LOCK(&frame_queue[fr->callno]);
+ AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
/* Cancel any outstanding txcnt's */
- if ((fr->callno == cur->callno) && (cur->transfer))
+ if (cur->transfer)
cur->retries = -1;
}
- AST_LIST_UNLOCK(&frame_queue);
+ AST_LIST_UNLOCK(&frame_queue[fr->callno]);
memset(&ied1, 0, sizeof(ied1));
iax_ie_append_short(&ied1, IAX_IE_CALLNO, iaxs[fr->callno]->callno);
send_command(iaxs[fr->callno], AST_FRAME_IAX, IAX_COMMAND_TXREADY, 0, ied1.buf, ied1.pos, -1);
@@ -8198,13 +8199,13 @@
break;
case IAX_COMMAND_TXMEDIA:
if (iaxs[fr->callno]->transferring == TRANSFER_READY) {
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+ AST_LIST_LOCK(&frame_queue[fr->callno]);
+ AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
/* Cancel any outstanding frames and start anew */
- if ((fr->callno == cur->callno) && (cur->transfer))
+ if (cur->transfer)
cur->retries = -1;
}
- AST_LIST_UNLOCK(&frame_queue);
+ AST_LIST_UNLOCK(&frame_queue[fr->callno]);
/* Start sending our media to the transfer address, but otherwise leave the call as-is */
iaxs[fr->callno]->transferring = TRANSFER_MEDIAPASS;
}
@@ -8811,25 +8812,14 @@
return NULL;
}
-static void *network_thread(void *ignore)
-{
- /* Our job is simple: Send queued messages, retrying if necessary. Read frames
- from the network, and queue them for delivery to the channels */
- int res, count, wakeup;
+static int transmit_queued_frames(void)
+{
+ int wakeup = -1, count = 0, i;
struct iax_frame *f;
- if (timingfd > -1)
- ast_io_add(io, timingfd, timing_read, AST_IO_IN | AST_IO_PRI, NULL);
-
- for(;;) {
- pthread_testcancel();
-
- /* Go through the queue, sending messages which have not yet been
- sent, and scheduling retransmissions if appropriate */
- AST_LIST_LOCK(&frame_queue);
- count = 0;
- wakeup = -1;
- AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue, f, list) {
+ for (i = 0; i < IAX_MAX_CALLS; i++) {
+ AST_LIST_LOCK(&frame_queue[i]);
+ AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue[i], f, list) {
if (f->sentyet)
continue;
@@ -8838,19 +8828,19 @@
wakeup = 1;
continue;
}
-
+
f->sentyet = 1;
-
+
if (iaxs[f->callno]) {
send_packet(f);
count++;
}
-
+
ast_mutex_unlock(&iaxsl[f->callno]);
-
+
if (f->retries < 0) {
/* This is not supposed to be retransmitted */
- AST_LIST_REMOVE_CURRENT(&frame_queue, list);
+ AST_LIST_REMOVE_CURRENT(&frame_queue[i], list);
/* Free the iax frame */
iax_frame_free(f);
} else {
@@ -8861,11 +8851,33 @@
}
}
AST_LIST_TRAVERSE_SAFE_END
- AST_LIST_UNLOCK(&frame_queue);
-
+ AST_LIST_UNLOCK(&frame_queue[i]);
+ }
+
+ if (count >= 20)
+ ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
+
+ return wakeup;
+}
+
+
+static void *network_thread(void *ignore)
+{
+ /* Our job is simple: Send queued messages, retrying if necessary. Read frames
+ from the network, and queue them for delivery to the channels */
+ int res, wakeup;
+
+ if (timingfd > -1)
+ ast_io_add(io, timingfd, timing_read, AST_IO_IN | AST_IO_PRI, NULL);
+
+ for(;;) {
pthread_testcancel();
- if (count >= 20)
- ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
+
+ /* Go through the queue, sending messages which have not yet been
+ sent, and scheduling retransmissions if appropriate */
+ wakeup = transmit_queued_frames();
+
+ pthread_testcancel();
/* Now do the IO, and run scheduled tasks */
res = ast_io_wait(io, wakeup);
@@ -10772,12 +10784,10 @@
/* Grab the sched lock resource to keep it away from threads about to die */
/* Cancel the network thread, close the net socket */
if (netthreadid != AST_PTHREADT_NULL) {
- AST_LIST_LOCK(&frame_queue);
ast_mutex_lock(&sched_lock);
pthread_cancel(netthreadid);
ast_cond_signal(&sched_cond);
ast_mutex_unlock(&sched_lock); /* Release the schedule lock resource */
- AST_LIST_UNLOCK(&frame_queue);
pthread_join(netthreadid, NULL);
}
if (schedthreadid != AST_PTHREADT_NULL) {
@@ -10833,8 +10843,10 @@
iax_provision_unload();
sched_context_destroy(sched);
- for (x = 0; x < IAX_MAX_CALLS; x++)
+ for (x = 0; x < IAX_MAX_CALLS; x++) {
ast_mutex_destroy(&iaxsl[x]);
+ AST_LIST_HEAD_DESTROY(&frame_queue[x]);
+ }
return 0;
}
@@ -10874,8 +10886,10 @@
memset(iaxs, 0, sizeof(iaxs));
- for (x=0;x<IAX_MAX_CALLS;x++)
+ for (x = 0; x < IAX_MAX_CALLS; x++) {
ast_mutex_init(&iaxsl[x]);
+ AST_LIST_HEAD_INIT(&frame_queue[x]);
+ }
ast_cond_init(&sched_cond, NULL);
More information about the asterisk-commits
mailing list