[asterisk-commits] russell: branch russell/sched_thread r143121 - /team/russell/sched_thread/cha...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sun Sep 14 19:12:55 CDT 2008
Author: russell
Date: Sun Sep 14 19:12:54 2008
New Revision: 143121
URL: http://svn.digium.com/view/asterisk?view=rev&rev=143121
Log:
Convert chan_sip to use a pool of task processors for processing incoming UDP packets.
Modified:
team/russell/sched_thread/channels/chan_sip.c
Modified: team/russell/sched_thread/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/channels/chan_sip.c?view=diff&rev=143121&r1=143120&r2=143121
==============================================================================
--- team/russell/sched_thread/channels/chan_sip.c (original)
+++ team/russell/sched_thread/channels/chan_sip.c Sun Sep 14 19:12:54 2008
@@ -198,6 +198,7 @@
#include "asterisk/ast_version.h"
#include "asterisk/event.h"
#include "asterisk/tcptls.h"
+#include "asterisk/taskprocessor.h"
#ifndef FALSE
#define FALSE 0
@@ -820,6 +821,12 @@
static struct ast_flags global_flags[2] = {{0}}; /*!< global SIP_ flags */
static char used_context[AST_MAX_CONTEXT]; /*!< name of automatically created context for unloading */
+/*! \brief Number of UDP processing threads */
+static unsigned int num_udpthreads;
+#define DEFAULT_UDPTHREADS 4
+#define MAX_UDPTHREADS 128
+
+struct ast_taskproc_pool *udpthreads;
AST_MUTEX_DEFINE_STATIC(netlock);
@@ -1434,8 +1441,9 @@
static struct sip_pvt *dialog_unref(struct sip_pvt *p, char *tag)
{
- if (p)
+ if (p) {
ao2_ref(p, -1);
+ }
return NULL;
}
#endif
@@ -1840,7 +1848,7 @@
static int sip_queryoption(struct ast_channel *chan, int option, void *data, int *datalen);
static const char *sip_get_callid(struct ast_channel *chan);
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin);
+static int handle_request(struct sip_request *req, struct sockaddr_in *sin);
static int sip_standard_port(struct sip_socket s);
static int sip_prepare_socket(struct sip_pvt *p);
static int sip_parse_host(char *line, int lineno, char **hostname, int *portnum, enum sip_transport *transport);
@@ -2409,7 +2417,7 @@
}
}
req.socket.ser = ser;
- handle_request_do(&req, &ser->requestor);
+ handle_request(&req, &ser->requestor);
}
cleanup:
@@ -19142,21 +19150,45 @@
return res;
}
-/*! \brief Read data from SIP socket
-\note sipsock_read locks the owner channel while we are processing the SIP message
-\return 1 on error, 0 on success
-\note Successful messages is connected to SIP call and forwarded to handle_incoming()
+struct sip_udp_arg {
+ struct sip_request req;
+ struct sockaddr_in sin;
+};
+
+static int sip_udp_process(void *data)
+{
+ struct sip_udp_arg *arg = data;
+
+ handle_request(&arg->req, &arg->sin);
+
+ if (arg->req.data) {
+ ast_free(arg->req.data);
+ arg->req.data = NULL;
+ }
+
+ ast_free(arg);
+
+ return 0;
+}
+
+/*!
+ * \brief Read data from SIP socket
+ * \return non-zero. A return value of 0 removes this entry from the IO handler.
+ * \note sipsock_read locks the owner channel while we are processing the SIP message
+ * \note Successful messages is connected to SIP call and forwarded to handle_incoming()
*/
static int sipsock_read(int *id, int fd, short events, void *ignore)
{
- struct sip_request req;
- struct sockaddr_in sin = { 0, };
+ struct sip_udp_arg *arg;
int res;
- socklen_t len = sizeof(sin);
+ socklen_t len = sizeof(arg->sin);
static char readbuf[65535];
- memset(&req, 0, sizeof(req));
- res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *)&sin, &len);
+ if (!(arg = ast_calloc(1, sizeof(*arg)))) {
+ return 1;
+ }
+
+ res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *) &arg->sin, &len);
if (res < 0) {
#if !defined(__FreeBSD__)
if (errno == EAGAIN)
@@ -19168,31 +19200,32 @@
return 1;
}
readbuf[res] = '\0';
- if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
+
+ if (!(arg->req.data = ast_str_create(SIP_MIN_PACKET))) {
return 1;
- ast_str_set(&req.data, 0, "%s", readbuf);
- if (res == sizeof(req.data) - 1) {
+ }
+ ast_str_set(&arg->req.data, 0, "%s", readbuf);
+
+ if (res == sizeof(arg->req.data) - 1) {
ast_debug(1, "Received packet exceeds buffer. Data is possibly lost\n");
- req.data->str[sizeof(req.data) - 1] = '\0';
- } else
- req.data->str[res] = '\0';
- req.len = res;
-
- req.socket.fd = sipsock;
- req.socket.type = SIP_TRANSPORT_UDP;
- req.socket.ser = NULL;
- req.socket.port = bindaddr.sin_port;
-
- handle_request_do(&req, &sin);
- if (req.data) {
- ast_free(req.data);
- req.data = NULL;
- }
+ arg->req.data->str[sizeof(arg->req.data) - 1] = '\0';
+ } else {
+ arg->req.data->str[res] = '\0';
+ }
+
+ arg->req.len = res;
+
+ arg->req.socket.fd = sipsock;
+ arg->req.socket.type = SIP_TRANSPORT_UDP;
+ arg->req.socket.ser = NULL;
+ arg->req.socket.port = bindaddr.sin_port;
+
+ ast_taskproc_pool_push(udpthreads, sip_udp_process, arg);
return 1;
}
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin)
+static int handle_request(struct sip_request *req, struct sockaddr_in *sin)
{
struct sip_pvt *p;
int recount = 0;
@@ -19606,7 +19639,7 @@
sipsock_read_id = ast_io_add(io, sipsock, sipsock_read, AST_IO_IN, NULL);
/* From here on out, we die whenever asked */
- for(;;) {
+ for (;;) {
/* Check for a reload request */
ast_mutex_lock(&sip_reload_lock);
reloading = sip_reloading;
@@ -19661,9 +19694,21 @@
/*! \brief Start the channel monitor thread */
static int restart_monitor(void)
{
+ if (!udpthreads && !(udpthreads = ast_taskproc_pool_create("SIP"))) {
+ return -1;
+ }
+
+ if (ast_taskproc_pool_set_size(udpthreads, num_udpthreads) != num_udpthreads) {
+ ast_log(LOG_ERROR, "Failed to set number of UDP threads to %u\n",
+ num_udpthreads);
+ return -1;
+ }
+
/* If we're supposed to be stopped -- stay stopped */
- if (monitor_thread == AST_PTHREADT_STOP)
+ if (monitor_thread == AST_PTHREADT_STOP) {
return 0;
+ }
+
ast_mutex_lock(&monlock);
if (monitor_thread == pthread_self()) {
ast_mutex_unlock(&monlock);
@@ -19682,6 +19727,7 @@
}
}
ast_mutex_unlock(&monlock);
+
return 0;
}
@@ -21437,6 +21483,8 @@
global_matchexterniplocally = FALSE;
+ num_udpthreads = DEFAULT_UDPTHREADS;
+
/* Copy the default jb config over global_jbconf */
memcpy(&global_jbconf, &default_jbconf, sizeof(struct ast_jb_conf));
@@ -21829,6 +21877,18 @@
} else {
global_st_refresher = i;
}
+ } else if (!strcasecmp(v->name, "udpthreads")) {
+ if (sscanf(v->value, "%u", &num_udpthreads) != 1) {
+ ast_log(LOG_WARNING, "'%s' not a valid value for number of UDP threads\n",
+ v->value);
+ num_udpthreads = DEFAULT_UDPTHREADS;
+ }
+
+ if (num_udpthreads > MAX_UDPTHREADS) {
+ ast_log(LOG_WARNING, "%u UDP threads is too many. Setting it to %u\n",
+ num_udpthreads, MAX_UDPTHREADS);
+ num_udpthreads = MAX_UDPTHREADS;
+ }
}
}
@@ -22808,6 +22868,11 @@
monitor_thread = AST_PTHREADT_STOP;
ast_mutex_unlock(&monlock);
+ if (udpthreads) {
+ ao2_ref(udpthreads, -1);
+ udpthreads = NULL;
+ }
+
/* Destroy all the dialogs and free their memory */
i = ao2_iterator_init(dialogs, 0);
while ((p = ao2_t_iterator_next(&i, "iterate thru dialogs"))) {
More information about the asterisk-commits
mailing list