[asterisk-commits] dvossel: branch dvossel/sip_nonblocking_tcp_client r220211 - /team/dvossel/si...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Sep 24 12:42:33 CDT 2009


Author: dvossel
Date: Thu Sep 24 12:42:28 2009
New Revision: 220211

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=220211
Log:
Implemented alert pipe into tcp_tls thread.


Modified:
    team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c

Modified: team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c?view=diff&rev=220211&r1=220210&r2=220211
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c (original)
+++ team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c Thu Sep 24 12:42:28 2009
@@ -2098,6 +2098,13 @@
 	char lastmsg[256];		/*!< Last Message sent/received */
 };
 
+enum sip_tcptls_alert {
+	/*! \brief There is new data to be sent out */
+	TCPTLS_ALERT_DATA,
+	/*! \brief A request to stop the tcp_handler thread */
+	TCPTLS_ALERT_STOP,
+};
+
 /*! \brief Definition of a thread that handles a socket */
 struct sip_threadinfo {
 	int stop;
@@ -2900,20 +2907,49 @@
 static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session)
 {
 	struct sip_threadinfo *th;
-	th = ao2_alloc(sizeof(*th), NULL);
-
-	if (!th)
+
+	if (!(th = ao2_alloc(sizeof(*th), NULL))) {
 		return NULL;
-
+	}
+
+	th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+	if (pipe(th->alert_pipe) == -1) {
+		ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
+		return NULL;
+	}
+	ast_log(LOG_NOTICE, " alertpipe is supposed to be up\n"); //todohere remove
 	th->tcptls_session = tcptls_session;
-	if (tcptls_session->ssl)
-		th->type = SIP_TRANSPORT_TLS;
-	else
-		th->type = SIP_TRANSPORT_TCP;
-//todohere create alert pipe!
+	th->type = tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP;
 	ao2_t_link(threadl, th, "Adding new tcptls helper thread");
 
 	return th;
+}
+
+static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len)
+{
+	int res = 0;
+	struct sip_threadinfo *th;
+	struct sip_threadinfo tmp = {
+		.tcptls_session = tcptls_session,
+	};
+	enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
+	if (!(th = ao2_t_find(threadl, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) {
+		return -1;
+	}
+
+	/* must lock the thread info object to guarantee control of alert_pipe fd */
+	ao2_lock(th);
+	res = ast_tcptls_server_write(tcptls_session, buf, len); //todohere, replace this with write to th info queue
+	if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
+		ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
+		//todohere remove from data queue since alert failed
+		res = -1;
+	}
+	ao2_unlock(th);
+
+	ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
+	return res;
 }
 
 /*! \brief SIP TCP thread management function
@@ -2925,6 +2961,7 @@
 	struct sip_request req = { 0, } , reqcpy = { 0, };
 	struct sip_threadinfo *me;
 	char buf[1024] = "";
+	struct pollfd fds[2] = { { 0 }, { 0 }, };
 
 	/* If this is a server connection, create a new thread info object.
 	 * 
@@ -2951,6 +2988,11 @@
 	me->threadid = pthread_self();
 	ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
+	/* set up pollfd to watch for reads on both the socket and the alert_pipe */
+	fds[0].fd = tcptls_session->fd;
+	fds[1].fd = me->alert_pipe[0];
+	fds[0].events = fds[1].events = POLLIN|POLLPRI;
+
 	if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
 		goto cleanup;
 	if (!(reqcpy.data = ast_str_create(SIP_MIN_PACKET)))
@@ -2979,54 +3021,110 @@
 			req.socket.port = htons(ourport_tcp);
 		}
 		req.socket.fd = tcptls_session->fd;
-		res = ast_wait_for_input(tcptls_session->fd, -1);
+
+		res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */
 		if (res < 0) {
 			ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res);
 			goto cleanup;
 		}
 
-		/* Read in headers one line at a time */
-		while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
-			ast_mutex_lock(&tcptls_session->lock);
-			if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
-				ast_mutex_unlock(&tcptls_session->lock);
-				goto cleanup;
-			}
-			ast_mutex_unlock(&tcptls_session->lock);
-			if (me->stop)
-				 goto cleanup;
-			ast_str_append(&req.data, 0, "%s", buf);
-			req.len = req.data->used;
-		}
-		copy_request(&reqcpy, &req);
-		parse_request(&reqcpy);
-		/* In order to know how much to read, we need the content-length header */
-		if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
-			while (cl > 0) {
-				size_t bytes_read;
+		/* handle the socket event, check for both reads from the socket fd,
+		 * and writes from alert_pipe fd */
+		if (fds[0].revents) { /* there is data on the socket to be read */
+			struct ast_str *str_save;
+
+			/* clear request structure */
+			str_save = req.data;
+			memset(&req, 0, sizeof(req));
+			req.data = str_save;
+			ast_str_reset(req.data);
+
+			str_save = reqcpy.data;
+			memset(&reqcpy, 0, sizeof(reqcpy));
+			reqcpy.data = str_save;
+			ast_str_reset(reqcpy.data);
+
+			memset(buf, 0, sizeof(buf));
+
+			if (tcptls_session->ssl) {
+				set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
+				req.socket.port = htons(ourport_tls);
+			} else {
+				set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
+				req.socket.port = htons(ourport_tcp);
+			}
+			req.socket.fd = tcptls_session->fd;
+
+			/* Read in headers one line at a time */
+			while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
 				ast_mutex_lock(&tcptls_session->lock);
-				if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+				if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
 					ast_mutex_unlock(&tcptls_session->lock);
 					goto cleanup;
 				}
-				buf[bytes_read] = '\0';
 				ast_mutex_unlock(&tcptls_session->lock);
 				if (me->stop)
-					goto cleanup;
-				cl -= strlen(buf);
+					 goto cleanup;
 				ast_str_append(&req.data, 0, "%s", buf);
 				req.len = req.data->used;
 			}
-		}
-		/*! \todo XXX If there's no Content-Length or if the content-length and what
-				we receive is not the same - we should generate an error */
-
-		req.socket.tcptls_session = tcptls_session;
-		handle_request_do(&req, &tcptls_session->remote_address);
+			copy_request(&reqcpy, &req);
+			parse_request(&reqcpy);
+			/* In order to know how much to read, we need the content-length header */
+			if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
+				while (cl > 0) {
+					size_t bytes_read;
+					ast_mutex_lock(&tcptls_session->lock);
+					if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+						ast_mutex_unlock(&tcptls_session->lock);
+						goto cleanup;
+					}
+					buf[bytes_read] = '\0';
+					ast_mutex_unlock(&tcptls_session->lock);
+					if (me->stop)
+						goto cleanup;
+					cl -= strlen(buf);
+					ast_str_append(&req.data, 0, "%s", buf);
+					req.len = req.data->used;
+				}
+			}
+			/*! \todo XXX If there's no Content-Length or if the content-length and what
+					we receive is not the same - we should generate an error */
+
+			req.socket.tcptls_session = tcptls_session;
+			handle_request_do(&req, &tcptls_session->remote_address);
+		} else if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */
+			enum sip_tcptls_alert alert;
+
+			if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) {
+				ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno));
+				continue;
+			}
+
+			switch (alert) {
+			case TCPTLS_ALERT_STOP:
+				goto cleanup;
+			case TCPTLS_ALERT_DATA:
+				ast_log(LOG_NOTICE, "GOT SEND ALERT!\n"); //todohere remove
+				//todohere check threadinfo read queue for packets to send
+				break;
+			default:
+				ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
+			}
+		}
 	}
 
 cleanup:
 	ao2_t_unlink(threadl, me, "Removing tcptls helper thread, thread is closing");
+	ao2_lock(me);
+	if (me->alert_pipe[1] > -1) {
+		close(me->alert_pipe[0]);
+	}
+	if (me->alert_pipe[1] > -1) {
+		close(me->alert_pipe[1]);
+	}
+	me->alert_pipe[0] = me->alert_pipe[1] = -1;
+	ao2_unlock(me);
 	ao2_t_ref(me, -1, "Closing tcptls thread, decrementing ref of sip_threadinfo");
 cleanup2:
 	fclose(tcptls_session->f);
@@ -3508,7 +3606,7 @@
 		res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
 	} else if (p->socket.tcptls_session) {
 		if (p->socket.tcptls_session->f) {
-			res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len);
+			res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
 		} else {
 			ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
 			return XMIT_ERROR;




More information about the asterisk-commits mailing list