[asterisk-commits] branch group/sip-threading r26173 - /team/group/sip-threading/channels/

asterisk-commits at lists.digium.com asterisk-commits at lists.digium.com
Tue May 9 09:25:26 MST 2006


Author: file
Date: Tue May  9 11:25:25 2006
New Revision: 26173

URL: http://svn.digium.com/view/asterisk?rev=26173&view=rev
Log:
Add what I have done so far since we are out of here

Modified:
    team/group/sip-threading/channels/chan_sip.c

Modified: team/group/sip-threading/channels/chan_sip.c
URL: http://svn.digium.com/view/asterisk/team/group/sip-threading/channels/chan_sip.c?rev=26173&r1=26172&r2=26173&view=diff
==============================================================================
--- team/group/sip-threading/channels/chan_sip.c (original)
+++ team/group/sip-threading/channels/chan_sip.c Tue May  9 11:25:25 2006
@@ -864,6 +864,24 @@
 	int packetlen;				/*!< Length of packet */
 	char data[0];
 };	
+
+/*! \brief sip thread - thread used for handling incoming sip requests and responses */
+struct sip_thread {
+	ast_mutex_t lock;                       /*!< Thread lock */
+	ast_cond_t cond;                        /*!< Thread condition -- used to wake it up */
+	pthread_t thread;                       /*!< Actual thread of this structure */
+	int num;                                /*!< Thread number */
+	int die;                                /*!< Tells the thread to die a horrible death */
+	struct sip_request req;                 /*!< SIP request to process */
+	struct sockaddr_in sin;                 /*!< Address information of originator */
+	AST_LIST_ENTRY(sip_thread) entry;       /*!< Linked list information */
+};
+
+static AST_LIST_HEAD_STATIC(idle_list, sip_thread);
+static AST_LIST_HEAD_STATIC(active_list, sip_thread);
+
+/* Maximum thread count */
+static int sipmaxthreads = 10;
 
 /*! \brief Structure for SIP user data. User's place calls to us */
 struct sip_user {
@@ -11879,6 +11897,98 @@
 	return res;
 }
 
+/*! \brief Helper function that processes a SIP request */
+static int sipsock_helper(struct sip_request *req, struct sockaddr_in *sin)
+{
+	struct sip_pvt *p = NULL;
+	int recount, nounlock = 0;
+
+	/* Parse the request */
+	parse_request(req);
+
+	/* Setup parameteres */
+        req->method = find_sip_method(req->rlPart1);
+        if (ast_test_flag(req, SIP_PKT_DEBUG)) {
+                ast_verbose("--- (%d headers %d lines)", req->headers, req->lines);
+                if (req->headers + req->lines == 0)
+                        ast_verbose(" Nat keepalive ");
+                ast_verbose("---\n");
+        }
+
+        if (req->headers < 2) {
+                /* Must have at least two headers */
+		ast_log(LOG_NOTICE, "Must have at least two headers but we only have %d\n", req->headers);
+                return 1;
+        }
+
+retrylock:
+	if ((p = find_call(req, sin, req->method))) {
+		if (p->owner && ast_channel_trylock(p->owner)) {
+                        if (option_debug)
+                                ast_log(LOG_DEBUG, "Failed to grab lock, trying again...\n");
+                        ast_mutex_unlock(&p->lock);
+                        usleep(1);
+                        goto retrylock;
+                }
+                p->recv = *sin;
+		if (recordhistory) /* This is a request or response, note what it was for */
+                        append_history(p, "Rx", "%s / %s / %s", req->data, get_header(req, "CSeq"), req->rlPart2);
+		if (handle_request(p, req, sin, &recount, &nounlock) == -1) {
+                        /* Request failed */
+                        if (option_debug)
+                                ast_log(LOG_DEBUG, "SIP message could not be handled, bad request: %-70.70s\n", p->callid[0] ? p->callid : "<no callid>");
+                }
+
+                if (p->owner && !nounlock)
+                        ast_channel_unlock(p->owner);
+                ast_mutex_unlock(&p->lock);
+        } else {
+                if (option_debug)
+                        ast_log(LOG_DEBUG, "Invalid SIP message - rejected , bad request: %-70.70s\n", p->callid[0] ? p->callid : "<no callid>");
+        }
+
+        if (recount)
+                ast_update_use_count();
+
+	return 0;
+}
+
+/*! \brief SIP Thread (used to offload request and response processing) */
+static void *sipsock_thread(void *data)
+{
+	struct sip_thread *thread = data;
+
+	if (!thread)
+		return NULL;
+
+	ast_log(LOG_DEBUG, "Sip thread %d starting\n", thread->num);
+
+	ast_mutex_lock(&thread->lock);
+	for (;;) {
+		/* Wait for data to come in */
+		ast_cond_wait(&thread->cond, &thread->lock);
+		/* See if we are being destroyed */
+		if (thread->die)
+			break;
+		/* Otherwise handle provided data */
+		sipsock_helper(&thread->req, &thread->sin);
+		/* Move ourselves from active to idle */
+		AST_LIST_LOCK(&active_list);
+		AST_LIST_REMOVE(&active_list, thread, entry);
+		AST_LIST_UNLOCK(&active_list);
+		AST_LIST_LOCK(&idle_list);
+		AST_LIST_INSERT_TAIL(&idle_list, thread, entry);
+		AST_LIST_UNLOCK(&idle_list);
+		ast_log(LOG_DEBUG, "Thread %d woke up and is servicing data\n", thread->num);
+	}
+	ast_log(LOG_DEBUG, "Thread %d is being destroyed\n", thread->num);
+	ast_mutex_unlock(&thread->lock);
+
+	/* We will actually be freed elsewhere */
+
+	return NULL;
+}
+
 /*! \brief Read data from SIP socket
 \note sipsock_read locks the owner channel while we are processing the SIP message
 \return 1 on error, 0 on success
@@ -11886,14 +11996,17 @@
 */
 static int sipsock_read(int *id, int fd, short events, void *ignore)
 {
+	struct sip_thread *thread = NULL;
 	struct sip_request req;
 	struct sockaddr_in sin = { 0, };
-	struct sip_pvt *p;
 	int res;
 	socklen_t len;
-	int nounlock;
-	int recount = 0;
 	char iabuf[INET_ADDRSTRLEN];
+
+	/* Find an idle thread if possible */
+	AST_LIST_LOCK(&idle_list);
+	thread = AST_LIST_REMOVE_HEAD(&idle_list, entry);
+	AST_LIST_UNLOCK(&idle_list);
 
 	len = sizeof(sin);
 	memset(&req, 0, sizeof(req));
@@ -11920,59 +12033,20 @@
 	if (ast_test_flag(&req, SIP_PKT_DEBUG))
 		ast_verbose("\n<-- SIP read from %s:%d: \n%s\n", ast_inet_ntoa(iabuf, sizeof(iabuf), sin.sin_addr), ntohs(sin.sin_port), req.data);
 
-	parse_request(&req);
-	req.method = find_sip_method(req.rlPart1);
-	if (ast_test_flag(&req, SIP_PKT_DEBUG)) {
-		ast_verbose("--- (%d headers %d lines)", req.headers, req.lines);
-		if (req.headers + req.lines == 0) 
-			ast_verbose(" Nat keepalive ");
-		ast_verbose("---\n");
-	}
-
-	if (req.headers < 2) {
-		/* Must have at least two headers */
-		return 1;
-	}
-
-
-	/* Process request, with netlock held */
-retrylock:
-	ast_mutex_lock(&netlock);
-
-	/* Find the active SIP dialog or create a new one */
-	p = find_call(&req, &sin, req.method);	/* returns p locked */
-	if (p) {
-		/* Go ahead and lock the owner if it has one -- we may need it */
-		/* becaues this is deadlock-prone, we need to try and unlock if failed */
-		if (p->owner && ast_channel_trylock(p->owner)) {
-			if (option_debug)
-				ast_log(LOG_DEBUG, "Failed to grab lock, trying again...\n");
-			ast_mutex_unlock(&p->lock);
-			ast_mutex_unlock(&netlock);
-			/* Sleep infintismly short amount of time */
-			usleep(1);
-			goto retrylock;
-		}
-		p->recv = sin;
-		if (recordhistory) /* This is a request or response, note what it was for */
-			append_history(p, "Rx", "%s / %s / %s", req.data, get_header(&req, "CSeq"), req.rlPart2);
-		nounlock = 0;
-		if (handle_request(p, &req, &sin, &recount, &nounlock) == -1) {
-			/* Request failed */
-			if (option_debug)
-				ast_log(LOG_DEBUG, "SIP message could not be handled, bad request: %-70.70s\n", p->callid[0] ? p->callid : "<no callid>");
-		}
-		
-		if (p->owner && !nounlock)
-			ast_channel_unlock(p->owner);
-		ast_mutex_unlock(&p->lock);
+	/* If we have a thread available... pass it there -- otherwise do it in this thread */
+	if (thread) {
+		/* Lock the thread... signal it, and add it to the active list */
+		AST_LIST_LOCK(&active_list);
+		ast_mutex_lock(&thread->lock);
+		ast_cond_signal(&thread->cond);
+		thread->sin = sin;
+		thread->req = req;
+		AST_LIST_INSERT_TAIL(&active_list, thread, entry);
+		AST_LIST_UNLOCK(&active_list);
+		ast_mutex_unlock(&thread->lock);
 	} else {
-		if (option_debug)
-			ast_log(LOG_DEBUG, "Invalid SIP message - rejected , bad request: %-70.70s\n", p->callid[0] ? p->callid : "<no callid>");
-	}
-	ast_mutex_unlock(&netlock);
-	if (recount)
-		ast_update_use_count();
+		sipsock_helper(&req, &sin);
+	}
 
 	return 1;
 }
@@ -13121,6 +13195,8 @@
 	int registry_count = 0, peer_count = 0, user_count = 0;
 	int temp_tos = 0;
 	struct ast_flags debugflag = {0};
+	struct sip_thread *thread = NULL;
+	int threadcount = 0;
 
 	cfg = ast_config_load(config);
 
@@ -13420,6 +13496,35 @@
 		allow_external_domains = 1;
 	}
 	
+	/* Spawn SIP threads if possible */
+	if (sipmaxthreads > 0 && AST_LIST_EMPTY(&idle_list) && AST_LIST_EMPTY(&active_list)) {
+		ast_log(LOG_NOTICE, "Spawning threads\n");
+		while (sipmaxthreads > threadcount) {
+			/* Allocate memory for this thread, and set it up */
+			thread = ast_calloc(1, sizeof(*thread));
+			ast_mutex_init(&thread->lock);
+			ast_cond_init(&thread->cond, NULL);
+			thread->num = threadcount++;
+			/* Good, now fire up an actual thread with this structure and let it go */
+			ast_mutex_lock(&thread->lock);
+			if (ast_pthread_create(&thread->thread, NULL, sipsock_thread, thread) < 0) {
+				/* Bail out ;( */
+				ast_log(LOG_ERROR, "Failed to create SIP thread\n");
+				ast_mutex_unlock(&thread->lock);
+				ast_cond_destroy(&thread->cond);
+				ast_mutex_destroy(&thread->lock);
+				free(thread);
+				break;
+			} else {
+				/* Everything was fine so add it to the list and unlock it */
+				AST_LIST_LOCK(&idle_list);
+				AST_LIST_INSERT_TAIL(&idle_list, thread, entry);
+				AST_LIST_UNLOCK(&idle_list);
+				ast_mutex_unlock(&thread->lock);
+			}
+		}
+	}
+
 	/* Build list of authentication to various SIP realms, i.e. service providers */
  	for (v = ast_variable_browse(cfg, "authentication"); v ; v = v->next) {
  		/* Format for authentication is auth = username:password at realm */
@@ -13999,6 +14104,7 @@
 
 static int unload_module(void *mod)
 {
+	struct sip_thread *thread = NULL;
 	struct sip_pvt *p, *pl;
 	
 	/* First, take us out of the channel type list */
@@ -14018,6 +14124,35 @@
 
 	ast_manager_unregister("SIPpeers");
 	ast_manager_unregister("SIPshowpeer");
+
+	/* Stop idle and active threads */
+	AST_LIST_LOCK(&idle_list);
+	while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, entry))) {
+		ast_mutex_lock(&thread->lock);
+		thread->die = 1;
+		ast_cond_signal(&thread->cond);
+		ast_mutex_unlock(&thread->lock);
+		pthread_join(thread->thread, NULL);
+		/* Free the data */
+		ast_mutex_destroy(&thread->lock);
+		ast_cond_destroy(&thread->cond);
+		free(thread);
+	}
+	AST_LIST_UNLOCK(&idle_list);
+
+	AST_LIST_LOCK(&active_list);
+	while ((thread = AST_LIST_REMOVE_HEAD(&active_list, entry))) {
+                ast_mutex_lock(&thread->lock);
+                thread->die = 1;
+                ast_cond_signal(&thread->cond);
+                ast_mutex_unlock(&thread->lock);
+                pthread_join(thread->thread, NULL);
+                /* Free the data */
+                ast_mutex_destroy(&thread->lock);
+                ast_cond_destroy(&thread->cond);
+                free(thread);
+	}
+	AST_LIST_UNLOCK(&active_list);
 
 	if (!ast_mutex_lock(&iflock)) {
 		/* Hangup all interfaces if they have an owner */



More information about the asterisk-commits mailing list