[asterisk-commits] dhubbard: branch group/taskprocessors r111340 - /team/group/taskprocessors/ch...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 26 22:00:36 CDT 2008


Author: dhubbard
Date: Wed Mar 26 22:00:36 2008
New Revision: 111340

URL: http://svn.digium.com/view/asterisk?view=rev&rev=111340
Log:
sipsocket_read() uses a taskprocessor in chan_sip.  This works as is with 1 channel (which is all I tested) but also still needs some work with TCP integration.  I want to load test this

Modified:
    team/group/taskprocessors/channels/chan_sip.c

Modified: team/group/taskprocessors/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/taskprocessors/channels/chan_sip.c?view=diff&rev=111340&r1=111339&r2=111340
==============================================================================
--- team/group/taskprocessors/channels/chan_sip.c (original)
+++ team/group/taskprocessors/channels/chan_sip.c Wed Mar 26 22:00:36 2008
@@ -176,6 +176,8 @@
 #include "asterisk/ast_version.h"
 #include "asterisk/event.h"
 #include "asterisk/tcptls.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
 
 #ifndef FALSE
 #define FALSE    0
@@ -1303,6 +1305,12 @@
 	struct sip_st_dlg *stimer;		/*!< SIP Session-Timers */              
 };
 
+static struct ast_taskproducer *taskproducer;
+struct sip_handle_request_task {
+	struct sip_request req;
+	struct sockaddr_in sin;
+};
+
 /*! Max entires in the history list for a sip_pvt */
 #define MAX_HISTORY_ENTRIES 50
 
@@ -1696,7 +1704,7 @@
 static int sip_queryoption(struct ast_channel *chan, int option, void *data, int *datalen);
 static const char *sip_get_callid(struct ast_channel *chan);
 
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin);
+static int handle_request_do(struct a_task *task);
 static int sip_standard_port(struct sip_socket s);
 static int sip_prepare_socket(struct sip_pvt *p);
 
@@ -2238,7 +2246,11 @@
 			}
 		}
 		req.socket.ser = ser;
-		handle_request_do(&req, &ser->requestor);
+#if 0
+		handle_request_do(&req, &ser->requestor);\
+#else
+		ast_log(LOG_ERROR, "DWAYNE: you did a bad thing here\n");
+#endif
 	}
 
 cleanup:
@@ -18066,14 +18078,16 @@
 */
 static int sipsock_read(int *id, int fd, short events, void *ignore)
 {
-	struct sip_request req;
-	struct sockaddr_in sin = { 0, };
+	struct sip_handle_request_task *taskdata;
+	struct a_task *task;
 	int res;
-	socklen_t len = sizeof(sin);
+	socklen_t len;
 	static char readbuf[65535];
-
-	memset(&req, 0, sizeof(req));
-	res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *)&sin, &len);
+	
+	taskdata = ast_calloc(1, sizeof(*taskdata));
+	len = sizeof(struct sockaddr_in);
+
+	res = recvfrom(fd, readbuf, sizeof(readbuf) - 1, 0, (struct sockaddr *)&taskdata->sin, &len);
 	if (res < 0) {
 #if !defined(__FreeBSD__)
 		if (errno == EAGAIN)
@@ -18085,35 +18099,37 @@
 		return 1;
 	}
 	readbuf[res] = '\0';
-	if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
+	if (!(taskdata->req.data = ast_str_create(SIP_MIN_PACKET)))
 		return 1;
-	ast_str_set(&req.data, 0, "%s", readbuf);
-	if (res == sizeof(req.data) - 1) {
+	ast_str_set(&taskdata->req.data, 0, "%s", readbuf);
+	if (res == sizeof(taskdata->req.data) - 1) {
 		ast_debug(1, "Received packet exceeds buffer. Data is possibly lost\n");
-		req.data->str[sizeof(req.data) - 1] = '\0';
+		taskdata->req.data->str[sizeof(taskdata->req.data) - 1] = '\0';
  	} else
-		req.data->str[res] = '\0';
-	req.len = res;
-
-	req.socket.fd 	= sipsock;
-	req.socket.type = SIP_TRANSPORT_UDP;
-	req.socket.ser	= NULL;
-	req.socket.port = bindaddr.sin_port;
-	req.socket.lock = NULL;
-
-	handle_request_do(&req, &sin);
-	if (req.data)
-		ast_free(req.data);
+		taskdata->req.data->str[res] = '\0';
+	taskdata->req.len = res;
+
+	taskdata->req.socket.fd 	= sipsock;
+	taskdata->req.socket.type = SIP_TRANSPORT_UDP;
+	taskdata->req.socket.ser	= NULL;
+	taskdata->req.socket.port = bindaddr.sin_port;
+	taskdata->req.socket.lock = NULL;
+
+	task = ast_task_alloc(handle_request_do, taskdata, "sipsock_read");
+	taskproducer->queue_task(taskproducer, task);
 
 	return 1;
 }
 
-static int handle_request_do(struct sip_request *req, struct sockaddr_in *sin) 
+static int handle_request_do(struct a_task *task) 
 {
 	struct sip_pvt *p;
 	int recount = 0;
 	int nounlock = 0;
 	int lockretry;
+	struct sip_handle_request_task *tomato = (struct sip_handle_request_task *)task->_datap;
+	struct sip_request *req = (struct sip_request *)&tomato->req;
+	struct sockaddr_in *sin = (struct sockaddr_in *)&tomato->sin;
 
 	if (sip_debug_test_addr(sin))	/* Set the debug flag early on packet level */
 		req->debug = 1;
@@ -18132,8 +18148,13 @@
 		ast_verbose("--- (%d headers %d lines)%s ---\n", req->headers, req->lines, (req->headers + req->lines == 0) ? " Nat keepalive" : "");
 
 	if (req->headers < 2) {	/* Must have at least two headers */
-		ast_free(req->data);
-		req->data = NULL;
+		if (req->data) {
+			ast_free(req->data);
+			req->data = NULL;
+		}
+		ast_free(task->_datap);
+		task->_datap = NULL;
+		ast_task_free(task);
 		return 1;
 	}
 
@@ -18146,6 +18167,13 @@
 		if (p == NULL) {
 			ast_debug(1, "Invalid SIP message - rejected , no callid, len %d\n", req->len);
 			ast_mutex_unlock(&netlock);
+			if (req->data) {
+				ast_free(req->data);
+				req->data = NULL;
+			}
+			ast_free(task->_datap);
+			task->_datap = NULL;
+			ast_task_free(task);
 			return 1;
 		}
 
@@ -18174,6 +18202,13 @@
 			transmit_response(p, "503 Server error", req);	/* We must respond according to RFC 3261 sec 12.2 */
 		/* XXX We could add retry-after to make sure they come back */
 		append_history(p, "LockFail", "Owner lock failed, transaction failed.");
+		if (req->data) {
+			ast_free(req->data);
+			req->data = NULL;
+		}
+		ast_free(task->_datap);
+		task->_datap = NULL;
+		ast_task_free(task);
 		return 1;
 	}
 
@@ -18191,6 +18226,13 @@
 	sip_pvt_unlock(p);
 	ast_mutex_unlock(&netlock);
 
+	if (req->data) {
+		ast_free(req->data);
+		req->data = NULL;
+	}
+	ast_free(task->_datap);
+	task->_datap = NULL;
+	ast_task_free(task);
 	return 1;
 }
 
@@ -21437,6 +21479,10 @@
 static int load_module(void)
 {
 	ast_verbose("SIP channel loading...\n");
+
+	/* this is not the best place to put this */
+	taskproducer = ast_taskproducer_alloc("sipsock");
+
 	ASTOBJ_CONTAINER_INIT(&userl);	/* User object list */
 	ASTOBJ_CONTAINER_INIT(&peerl);	/* Peer object list */
 	ASTOBJ_CONTAINER_INIT(&regl);	/* Registry object list */
@@ -21513,7 +21559,9 @@
 	struct sip_pvt *p, *pl;
 	struct sip_threadinfo *th;
 	struct ast_context *con;
-	
+
+	ao2_ref(taskproducer, -1);
+
 	/* First, take us out of the channel type list */
 	ast_channel_unregister(&sip_tech);
 




More information about the asterisk-commits mailing list