[asterisk-commits] russell: branch russell/sched_thread r143121 - /team/russell/sched_thread/cha...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sun Sep 14 19:12:55 CDT 2008


Author: russell
Date: Sun Sep 14 19:12:54 2008
New Revision: 143121

URL: http://svn.digium.com/view/asterisk?view=rev&rev=143121
Log:
Convert chan_sip to use a pool of task processors for processing incoming UDP packets.

Modified:
    team/russell/sched_thread/channels/chan_sip.c

Modified: team/russell/sched_thread/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/russell/sched_thread/channels/chan_sip.c?view=diff&rev=143121&r1=143120&r2=143121
==============================================================================
--- team/russell/sched_thread/channels/chan_sip.c (original)
+++ team/russell/sched_thread/channels/chan_sip.c Sun Sep 14 19:12:54 2008
@@ -198,6 +198,7 @@
 #include "asterisk/ast_version.h"
 #include "asterisk/event.h"
 #include "asterisk/tcptls.h"
+#include "asterisk/taskprocessor.h"
 
 #ifndef FALSE
 #define FALSE    0
@@ -820,6 +821,12 @@
 static struct ast_flags global_flags[2] = {{0}};        /*!< global SIP_ flags */
 static char used_context[AST_MAX_CONTEXT];		/*!< name of automatically created context for unloading */
 
+/*! \brief Number of UDP processing threads */
+static unsigned int num_udpthreads;
+#define DEFAULT_UDPTHREADS 4
+#define MAX_UDPTHREADS 128
+
+struct ast_taskproc_pool *udpthreads;
 
 AST_MUTEX_DEFINE_STATIC(netlock);
 
@@ -1434,8 +1441,9 @@
 
 static struct sip_pvt *dialog_unref(struct sip_pvt *p, char *tag)
 {
-	if (p)
+	if (p) {
 		ao2_ref(p, -1);
+	}
 	return NULL;
 }
 #endif
@@ -1840,7 +1848,7 @@
 static int sip_queryoption(struct ast_channel *chan, int option, void *data, int *datalen);
 static const char *sip_get_callid(struct ast_channel *chan);
 
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin);
+static int handle_request(struct sip_request *req, struct sockaddr_in *sin);
 static int sip_standard_port(struct sip_socket s);
 static int sip_prepare_socket(struct sip_pvt *p);
 static int sip_parse_host(char *line, int lineno, char **hostname, int *portnum, enum sip_transport *transport);
@@ -2409,7 +2417,7 @@
 			}
 		}
 		req.socket.ser = ser;
-		handle_request_do(&req, &ser->requestor);
+		handle_request(&req, &ser->requestor);
 	}
 
 cleanup:
@@ -19142,21 +19150,45 @@
 	return res;
 }
 
-/*! \brief Read data from SIP socket
-\note sipsock_read locks the owner channel while we are processing the SIP message
-\return 1 on error, 0 on success
-\note Successful messages is connected to SIP call and forwarded to handle_incoming() 
+struct sip_udp_arg {
+	struct sip_request req;
+	struct sockaddr_in sin;
+};
+
+static int sip_udp_process(void *data)
+{
+	struct sip_udp_arg *arg = data;
+
+	handle_request(&arg->req, &arg->sin);
+
+	if (arg->req.data) {
+		ast_free(arg->req.data);
+		arg->req.data = NULL;
+	}
+
+	ast_free(arg);
+
+	return 0;
+}
+
+/*! 
+ * \brief Read data from SIP socket
+ * \return non-zero.  A return value of 0 removes this entry from the IO handler.
+ * \note sipsock_read locks the owner channel while we are processing the SIP message
+ * \note Successful messages is connected to SIP call and forwarded to handle_incoming() 
 */
 static int sipsock_read(int *id, int fd, short events, void *ignore)
 {
-	struct sip_request req;
-	struct sockaddr_in sin = { 0, };
+	struct sip_udp_arg *arg;
 	int res;
-	socklen_t len = sizeof(sin);
+	socklen_t len = sizeof(arg->sin);
 	static char readbuf[65535];
 
-	memset(&req, 0, sizeof(req));
-	res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *)&sin, &len);
+	if (!(arg = ast_calloc(1, sizeof(*arg)))) {
+		return 1;
+	}
+
+	res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *) &arg->sin, &len);
 	if (res < 0) {
 #if !defined(__FreeBSD__)
 		if (errno == EAGAIN)
@@ -19168,31 +19200,32 @@
 		return 1;
 	}
 	readbuf[res] = '\0';
-	if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
+
+	if (!(arg->req.data = ast_str_create(SIP_MIN_PACKET))) {
 		return 1;
-	ast_str_set(&req.data, 0, "%s", readbuf);
-	if (res == sizeof(req.data) - 1) {
+	}
+	ast_str_set(&arg->req.data, 0, "%s", readbuf);
+
+	if (res == sizeof(arg->req.data) - 1) {
 		ast_debug(1, "Received packet exceeds buffer. Data is possibly lost\n");
-		req.data->str[sizeof(req.data) - 1] = '\0';
- 	} else
-		req.data->str[res] = '\0';
-	req.len = res;
-
-	req.socket.fd 	= sipsock;
-	req.socket.type = SIP_TRANSPORT_UDP;
-	req.socket.ser	= NULL;
-	req.socket.port = bindaddr.sin_port;
-
-	handle_request_do(&req, &sin);
-	if (req.data) {
-		ast_free(req.data);
-		req.data = NULL;
-	}
+		arg->req.data->str[sizeof(arg->req.data) - 1] = '\0';
+ 	} else {
+		arg->req.data->str[res] = '\0';
+	}
+
+	arg->req.len = res;
+
+	arg->req.socket.fd 	= sipsock;
+	arg->req.socket.type = SIP_TRANSPORT_UDP;
+	arg->req.socket.ser	= NULL;
+	arg->req.socket.port = bindaddr.sin_port;
+
+	ast_taskproc_pool_push(udpthreads, sip_udp_process, arg);
 
 	return 1;
 }
 
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin) 
+static int handle_request(struct sip_request *req, struct sockaddr_in *sin) 
 {
 	struct sip_pvt *p;
 	int recount = 0;
@@ -19606,7 +19639,7 @@
 		sipsock_read_id = ast_io_add(io, sipsock, sipsock_read, AST_IO_IN, NULL);
 
 	/* From here on out, we die whenever asked */
-	for(;;) {
+	for (;;) {
 		/* Check for a reload request */
 		ast_mutex_lock(&sip_reload_lock);
 		reloading = sip_reloading;
@@ -19661,9 +19694,21 @@
 /*! \brief Start the channel monitor thread */
 static int restart_monitor(void)
 {
+	if (!udpthreads && !(udpthreads = ast_taskproc_pool_create("SIP"))) {
+		return -1;
+	}
+
+	if (ast_taskproc_pool_set_size(udpthreads, num_udpthreads) != num_udpthreads) {
+		ast_log(LOG_ERROR, "Failed to set number of UDP threads to %u\n",
+				num_udpthreads);
+		return -1;
+	}
+
 	/* If we're supposed to be stopped -- stay stopped */
-	if (monitor_thread == AST_PTHREADT_STOP)
+	if (monitor_thread == AST_PTHREADT_STOP) {
 		return 0;
+	}
+
 	ast_mutex_lock(&monlock);
 	if (monitor_thread == pthread_self()) {
 		ast_mutex_unlock(&monlock);
@@ -19682,6 +19727,7 @@
 		}
 	}
 	ast_mutex_unlock(&monlock);
+
 	return 0;
 }
 
@@ -21437,6 +21483,8 @@
 
 	global_matchexterniplocally = FALSE;
 
+	num_udpthreads = DEFAULT_UDPTHREADS;
+
 	/* Copy the default jb config over global_jbconf */
 	memcpy(&global_jbconf, &default_jbconf, sizeof(struct ast_jb_conf));
 
@@ -21829,6 +21877,18 @@
 			} else {
 				global_st_refresher = i;
 			}
+		} else if (!strcasecmp(v->name, "udpthreads")) {
+			if (sscanf(v->value, "%u", &num_udpthreads) != 1) {
+				ast_log(LOG_WARNING, "'%s' not a valid value for number of UDP threads\n", 
+						v->value);
+				num_udpthreads = DEFAULT_UDPTHREADS;
+			}
+
+			if (num_udpthreads > MAX_UDPTHREADS) {
+				ast_log(LOG_WARNING, "%u UDP threads is too many.  Setting it to %u\n", 
+						num_udpthreads, MAX_UDPTHREADS);
+				num_udpthreads = MAX_UDPTHREADS;
+			}
 		}
 	}
 
@@ -22808,6 +22868,11 @@
 	monitor_thread = AST_PTHREADT_STOP;
 	ast_mutex_unlock(&monlock);
 
+	if (udpthreads) {
+		ao2_ref(udpthreads, -1);
+		udpthreads = NULL;
+	}
+
 	/* Destroy all the dialogs and free their memory */
 	i = ao2_iterator_init(dialogs, 0);
 	while ((p = ao2_t_iterator_next(&i, "iterate thru dialogs"))) {




More information about the asterisk-commits mailing list