[asterisk-commits] russell: branch russell/iax2_transmit_q r184836 - /team/russell/iax2_transmit...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Mar 28 22:36:50 CDT 2009
Author: russell
Date: Sat Mar 28 22:36:47 2009
New Revision: 184836
URL: http://svn.digium.com/svn-view/asterisk?view=rev&rev=184836
Log:
add patch for transmit queue optimization.
Modified:
team/russell/iax2_transmit_q/channels/chan_iax2.c
Modified: team/russell/iax2_transmit_q/channels/chan_iax2.c
URL: http://svn.digium.com/svn-view/asterisk/team/russell/iax2_transmit_q/channels/chan_iax2.c?view=diff&rev=184836&r1=184835&r2=184836
==============================================================================
--- team/russell/iax2_transmit_q/channels/chan_iax2.c (original)
+++ team/russell/iax2_transmit_q/channels/chan_iax2.c Sat Mar 28 22:36:47 2009
@@ -88,6 +88,7 @@
#include "asterisk/event.h"
#include "asterisk/astobj2.h"
#include "asterisk/timing.h"
+#include "asterisk/taskprocessor.h"
#include "iax2.h"
#include "iax2-parser.h"
@@ -761,6 +762,8 @@
*/
static AST_LIST_HEAD_STATIC(frame_queue, iax_frame);
+static struct ast_taskprocessor *transmit_processor;
+
/*!
* This module will get much higher performance when doing a lot of
* user and peer lookups if the number of buckets is increased from 1.
@@ -3304,23 +3307,38 @@
return 0;
}
+static int transmit_frame(void *data)
+{
+ struct iax_frame *fr = data;
+
+ ast_mutex_lock(&iaxsl[fr->callno]);
+ fr->sentyet = 1;
+ if (iaxs[fr->callno]) {
+ send_packet(fr);
+ }
+ ast_mutex_unlock(&iaxsl[fr->callno]);
+
+ if (fr->retries < 0) {
+ /* No retransmit requested */
+ iax_frame_free(fr);
+ } else {
+ /* We need reliable delivery. Schedule a retransmission */
+ AST_LIST_LOCK(&frame_queue);
+ AST_LIST_INSERT_TAIL(&frame_queue, fr, list);
+ AST_LIST_UNLOCK(&frame_queue);
+ fr->retries++;
+ fr->retrans = iax2_sched_add(sched, fr->retrytime, attempt_transmit, fr);
+ }
+
+ return 0;
+}
+
static int iax2_transmit(struct iax_frame *fr)
{
- /* Lock the queue and place this packet at the end */
- /* By setting this to 0, the network thread will send it for us, and
- queue retransmission if necessary */
fr->sentyet = 0;
- AST_LIST_LOCK(&frame_queue);
- AST_LIST_INSERT_TAIL(&frame_queue, fr, list);
- AST_LIST_UNLOCK(&frame_queue);
- /* Wake up the network and scheduler thread */
- if (netthreadid != AST_PTHREADT_NULL)
- pthread_kill(netthreadid, SIGURG);
- ast_sched_thread_poke(sched);
- return 0;
-}
-
-
+
+ return ast_taskprocessor_push(transmit_processor, transmit_frame, fr);
+}
static int iax2_digit_begin(struct ast_channel *c, char digit)
{
@@ -10495,66 +10513,18 @@
static void *network_thread(void *ignore)
{
- /* Our job is simple: Send queued messages, retrying if necessary. Read frames
- from the network, and queue them for delivery to the channels */
- int res, count, wakeup;
- struct iax_frame *f;
-
- if (timer)
+ if (timer) {
ast_io_add(io, ast_timer_fd(timer), timing_read, AST_IO_IN | AST_IO_PRI, NULL);
-
- for(;;) {
+ }
+
+ for (;;) {
pthread_testcancel();
-
- /* Go through the queue, sending messages which have not yet been
- sent, and scheduling retransmissions if appropriate */
- AST_LIST_LOCK(&frame_queue);
- count = 0;
- wakeup = -1;
- AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue, f, list) {
- if (f->sentyet)
- continue;
-
- /* Try to lock the pvt, if we can't... don't fret - defer it till later */
- if (ast_mutex_trylock(&iaxsl[f->callno])) {
- wakeup = 1;
- continue;
- }
-
- f->sentyet = 1;
-
- if (iaxs[f->callno]) {
- send_packet(f);
- count++;
- }
-
- ast_mutex_unlock(&iaxsl[f->callno]);
-
- if (f->retries < 0) {
- /* This is not supposed to be retransmitted */
- AST_LIST_REMOVE_CURRENT(list);
- /* Free the iax frame */
- iax_frame_free(f);
- } else {
- /* We need reliable delivery. Schedule a retransmission */
- f->retries++;
- f->retrans = iax2_sched_add(sched, f->retrytime, attempt_transmit, f);
- }
- }
- AST_LIST_TRAVERSE_SAFE_END;
- AST_LIST_UNLOCK(&frame_queue);
-
- pthread_testcancel();
- if (count >= 20)
- ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
-
- /* Now do the IO, and run scheduled tasks */
- res = ast_io_wait(io, wakeup);
- if (res >= 0) {
- if (res >= 20)
- ast_debug(1, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res);
- }
- }
+ /* Wake up once a second just in case SIGURG was sent while
+ * we weren't in poll(), to make sure we don't hang when trying
+ * to unload. */
+ ast_io_wait(io, 1000);
+ }
+
return NULL;
}
@@ -12426,19 +12396,14 @@
struct ast_context *con;
int x;
- /* Make sure threads do not hold shared resources when they are canceled */
-
- /* Grab the sched lock resource to keep it away from threads about to die */
- /* Cancel the network thread, close the net socket */
if (netthreadid != AST_PTHREADT_NULL) {
- AST_LIST_LOCK(&frame_queue);
pthread_cancel(netthreadid);
- AST_LIST_UNLOCK(&frame_queue);
+ pthread_kill(netthreadid, SIGURG);
pthread_join(netthreadid, NULL);
}
sched = ast_sched_thread_destroy(sched);
-
+
/* Call for all threads to halt */
AST_LIST_LOCK(&idle_list);
while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list)))
@@ -12489,6 +12454,7 @@
if (timer) {
ast_timer_close(timer);
}
+ transmit_processor = ast_taskprocessor_unreference(transmit_processor);
con = ast_context_find(regcontext);
if (con)
@@ -12557,19 +12523,23 @@
struct iax2_registry *reg = NULL;
peers = ao2_container_alloc(MAX_PEER_BUCKETS, peer_hash_cb, peer_cmp_cb);
- if (!peers)
+ if (!peers) {
return AST_MODULE_LOAD_FAILURE;
+ }
+
users = ao2_container_alloc(MAX_USER_BUCKETS, user_hash_cb, user_cmp_cb);
if (!users) {
ao2_ref(peers, -1);
return AST_MODULE_LOAD_FAILURE;
}
+
iax_peercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, pvt_hash_cb, pvt_cmp_cb);
if (!iax_peercallno_pvts) {
ao2_ref(peers, -1);
ao2_ref(users, -1);
return AST_MODULE_LOAD_FAILURE;
}
+
iax_transfercallno_pvts = ao2_container_alloc(IAX_MAX_CALLS, transfercallno_pvt_hash_cb, transfercallno_pvt_cmp_cb);
if (!iax_transfercallno_pvts) {
ao2_ref(peers, -1);
@@ -12577,6 +12547,16 @@
ao2_ref(iax_peercallno_pvts, -1);
return AST_MODULE_LOAD_FAILURE;
}
+
+ transmit_processor = ast_taskprocessor_get("iax2_transmit", TPS_REF_DEFAULT);
+ if (!transmit_processor) {
+ ao2_ref(peers, -1);
+ ao2_ref(users, -1);
+ ao2_ref(iax_peercallno_pvts, -1);
+ ao2_ref(iax_transfercallno_pvts, -1);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
ast_custom_function_register(&iaxpeer_function);
ast_custom_function_register(&iaxvar_function);
More information about the asterisk-commits
mailing list