[asterisk-commits] dvossel: branch dvossel/sip_nonblocking_tcp_client r220533 - in /team/dvossel...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Sep 25 12:55:40 CDT 2009


Author: dvossel
Date: Fri Sep 25 12:55:35 2009
New Revision: 220533

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=220533
Log:
This patch moves tcp/tls connection setup into the tcp helper thread


Modified:
    team/dvossel/sip_nonblocking_tcp_client/apps/app_externalivr.c
    team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c
    team/dvossel/sip_nonblocking_tcp_client/include/asterisk/tcptls.h
    team/dvossel/sip_nonblocking_tcp_client/main/tcptls.c

Modified: team/dvossel/sip_nonblocking_tcp_client/apps/app_externalivr.c
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/apps/app_externalivr.c?view=diff&rev=220533&r1=220532&r2=220533
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/apps/app_externalivr.c (original)
+++ team/dvossel/sip_nonblocking_tcp_client/apps/app_externalivr.c Fri Sep 25 12:55:35 2009
@@ -457,8 +457,9 @@
 		ivr_desc.local_address.sin_family = AF_INET;
 		ivr_desc.local_address.sin_port = htons(port);
 		memcpy(&ivr_desc.local_address.sin_addr.s_addr, hp.hp.h_addr, hp.hp.h_length);
-		ser = ast_tcptls_client_start(&ivr_desc);
-
+		if ((ser = ast_tcptls_client_create(&ivr_desc))) {
+			ser = ast_tcptls_client_start(ser);
+		}
 		if (!ser) {
 			goto exit;
 		}

Modified: team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c?view=diff&rev=220533&r1=220532&r2=220533
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c (original)
+++ team/dvossel/sip_nonblocking_tcp_client/channels/chan_sip.c Fri Sep 25 12:55:35 2009
@@ -2911,6 +2911,17 @@
 	}
 }
 
+static void sip_tcptls_client_args_destructor(void *obj)
+{
+	struct ast_tcptls_session_args *args = obj;
+
+	if (args->tls_cfg) {
+		ast_free(args->tls_cfg);
+	}
+	if (!ast_strlen_zero(args->name)) {
+		ast_free((char *) args->name);
+	}
+}
 static void sip_threadinfo_destructor(void *obj)
 {
 	struct sip_threadinfo *th = obj;
@@ -2926,21 +2937,17 @@
 	while ((packet = AST_LIST_REMOVE_HEAD(&th->packet_q, entry))) {
 		ao2_t_ref(packet, -1, "thread destruction, removing packet from frame queue");
 	}
-}
-
-/*! \brief SIP TCP connection handler */
-static void *sip_tcp_worker_fn(void *data)
-{
-	struct ast_tcptls_session_instance *tcptls_session = data;
-
-	return _sip_tcp_helper_thread(NULL, tcptls_session);
+
+	if (th->tcptls_session) {
+		ao2_t_ref(th->tcptls_session, -1, "remove tcptls_session for sip_threadinfo object");
+	}
 }
 
 static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session)
 {
 	struct sip_threadinfo *th;
 
-	if (!(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) {
+	if (!tcptls_session || !(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) {
 		return NULL;
 	}
 
@@ -2950,10 +2957,11 @@
 		ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
 		return NULL;
 	}
+	ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object");
 	th->tcptls_session = tcptls_session;
 	th->type = tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP;
 	ao2_t_link(threadl, th, "Adding new tcptls helper thread");
-
+	ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains");
 	return th;
 }
 
@@ -2961,43 +2969,71 @@
 {
 	int res = len;
 	struct sip_threadinfo *th = NULL;
+	struct tcptls_packet *packet = NULL;
 	struct sip_threadinfo tmp = {
 		.tcptls_session = tcptls_session,
 	};
 	enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
-	struct tcptls_packet *packet = NULL;
+
+	if (!tcptls_session) {
+		return XMIT_ERROR;
+	}
+
+	ast_mutex_lock(&tcptls_session->lock);
+
+	if (tcptls_session->fd == -1) {
+		goto tcptls_write_setup_error;
+	}
 	if (!(th = ao2_t_find(threadl, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) {
-		return -1;
-	}
-
-	/* must lock the thread info object to guarantee control of alert_pipe fd and the packet queue */
-
+		goto tcptls_write_setup_error;
+	}
 	if (!(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor))) {
-		ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
-		return -1;
-	}
-
+		goto tcptls_write_setup_error;
+	}
 	if (!(packet->data = ast_str_create(len))) {
-		ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
-		ao2_t_ref(packet, -1, "could not allocate packet's data");
-		return -1;
-	}
+		goto tcptls_write_setup_error;
+	}
+
+	/* goto tcptls_write_error should _NOT_ be used beyond this point */
 	ast_str_set(&packet->data, 0, "%s", (char *) buf);
 	packet->len = len;
 
-	/* alert tcptls thread handler that there is a packet to be sent */
+	/* alert tcptls thread handler that there is a packet to be sent.
+	 * must lock the thread info object to guarantee control of
+	 * alert_pipe fd and the packet queue */
 	ao2_lock(th);
 	AST_LIST_INSERT_TAIL(&th->packet_q, packet, entry);
 	if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
 		ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
 		AST_LIST_REMOVE_HEAD(&th->packet_q, entry);
 		ao2_t_ref(packet, -1, "could not write to alert pipe, remove packet");
-		res = -1;
-	}
+		res = XMIT_ERROR;
+	}
+
+	/* clean up locks, remove threadinfo ref from ao2_find */
 	ao2_unlock(th);
-
+	ast_mutex_unlock(&tcptls_session->lock);
 	ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
 	return res;
+
+tcptls_write_setup_error:
+	if (th) {
+		ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
+	}
+	if (packet) {
+		ao2_t_ref(packet, -1, "could not allocate packet's data");
+	}
+	ast_mutex_unlock(&tcptls_session->lock);
+
+	return XMIT_ERROR;
+}
+
+/*! \brief SIP TCP connection handler */
+static void *sip_tcp_worker_fn(void *data)
+{
+	struct ast_tcptls_session_instance *tcptls_session = data;
+
+	return _sip_tcp_helper_thread(NULL, tcptls_session);
 }
 
 /*! \brief SIP TCP thread management function
@@ -3010,6 +3046,7 @@
 	struct sip_threadinfo *me;
 	char buf[1024] = "";
 	struct pollfd fds[2] = { { 0 }, { 0 }, };
+	struct ast_tcptls_session_args *ca = NULL;
 
 	/* If this is a server connection, create a new thread info object.
 	 * 
@@ -3019,18 +3056,22 @@
 	 * A threadinfo object has 2 references once it is created, one for the table, and one for this thread.
 	 */
 	if (!tcptls_session->client) {
-		me = sip_threadinfo_create(tcptls_session);
+		if (!(me = sip_threadinfo_create(tcptls_session))) {
+			goto cleanup2;
+		}
+		ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread");
 	} else {
 		struct sip_threadinfo tmp = {
 			.tcptls_session = tcptls_session,
 		};
-		if ((me = ao2_t_find(threadl, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) {
-			ao2_t_ref(me, -1, "In tcp_helper_thread, unref threadinfo object after finding it");
-		}
-	}
-
-	if (!me) {
-		goto cleanup2;
+		ca = tcptls_session->parent;
+		if (!(me = ao2_t_find(threadl, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) {
+			goto cleanup2;
+		}
+
+		if (!(tcptls_session = ast_tcptls_client_start(tcptls_session))) {
+			goto cleanup;
+		}
 	}
 
 	me->threadid = pthread_self();
@@ -3165,7 +3206,6 @@
 					ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed");
 				}
 				ao2_unlock(me);
-
 				break;
 			default:
 				ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
@@ -3175,11 +3215,8 @@
 
 cleanup:
 	ao2_t_unlink(threadl, me, "Removing tcptls helper thread, thread is closing");
-	ao2_t_ref(me, -1, "Closing tcptls thread, decrementing ref of sip_threadinfo");
+	ao2_t_ref(me, -1, "Removing tcp_helper_threads threadinfo ref");
 cleanup2:
-	fclose(tcptls_session->f);
-	tcptls_session->f = NULL;
-	tcptls_session->fd = -1;
 	if (reqcpy.data) {
 		ast_free(reqcpy.data);
 	}
@@ -3191,8 +3228,27 @@
 
 	ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
-	ao2_ref(tcptls_session, -1);
-	tcptls_session = NULL;
+	/* if client, we own the parent session arguments and must decrement ref */
+	if (ca) {
+		ao2_t_ref(ca, -1, "closing tcptls thread, getting rid of client tcptls_session arguments");
+	}
+
+	if (tcptls_session) {
+		ast_mutex_lock(&tcptls_session->lock);
+		if (tcptls_session->f) {
+			fclose(tcptls_session->f);
+			tcptls_session->f = NULL;
+		}
+		if (tcptls_session->fd != -1) {
+			close(tcptls_session->fd);
+			tcptls_session->fd = -1;
+		}
+		tcptls_session->parent = NULL;
+		ast_mutex_unlock(&tcptls_session->lock);
+
+		ao2_ref(tcptls_session, -1);
+		tcptls_session = NULL;
+	}
 	return NULL;
 }
 
@@ -3649,25 +3705,14 @@
 	if (sip_prepare_socket(p) < 0)
 		return XMIT_ERROR;
 
-	if (p->socket.tcptls_session)
-		ast_mutex_lock(&p->socket.tcptls_session->lock);
-
-	if (p->socket.type & SIP_TRANSPORT_UDP) {
+	if (p->socket.type == SIP_TRANSPORT_UDP) {
 		res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
 	} else if (p->socket.tcptls_session) {
-		if (p->socket.tcptls_session->f) {
-			res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
-		} else {
-			ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
-			return XMIT_ERROR;
-		}
+		res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
 	} else {
 		ast_debug(2, "Socket type is TCP but no tcptls_session is present to write to\n");
 		return XMIT_ERROR;
 	}
-
-	if (p->socket.tcptls_session)
-		ast_mutex_unlock(&p->socket.tcptls_session->lock);
 
 	if (res == -1) {
 		switch (errno) {
@@ -12376,6 +12421,11 @@
 	destroy_association(peer);	/* remove registration data from storage */
 	set_socket_transport(&peer->socket, peer->default_outbound_transport);
 
+	if (peer->socket.tcptls_session) {
+		ao2_ref(peer->socket.tcptls_session, -1);
+		peer->socket.tcptls_session = NULL;
+	}
+
 	manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: SIP\r\nPeer: SIP/%s\r\nPeerStatus: Unregistered\r\nCause: Expired\r\n", peer->name);
 	register_peer_exten(peer, FALSE);	/* Remove regexten */
 	ast_devstate_changed(AST_DEVICE_UNKNOWN, "SIP/%s", peer->name);
@@ -22792,13 +22842,20 @@
 	static const char name[] = "SIP socket";
 	struct sip_threadinfo *th;
 	struct ast_tcptls_session_instance *tcptls_session;
-	struct ast_tcptls_session_args ca = {
+	struct ast_tcptls_session_args tmp_ca = {
 		.name = name,
 		.accept_fd = -1,
 	};
-
-	if (s->fd != -1)
-		return s->fd;	/* This socket is already active */
+	struct ast_tcptls_session_args *ca;
+
+	/* check to see if a socket is already active */
+	if ((s->fd != -1) && (s->type == SIP_TRANSPORT_UDP)) {
+		return s->fd;
+	} else if ((s->type & (SIP_TRANSPORT_TCP | SIP_TRANSPORT_TLS)) &&
+			(s->tcptls_session) &&
+			(s->tcptls_session->fd != -1)) {
+		return s->tcptls_session->fd;
+	}
 
 	/*! \todo Check this... This might be wrong, depending on the proxy configuration
 		If proxy is in "force" mode its correct.
@@ -22807,14 +22864,24 @@
 		s->type = p->outboundproxy->transport;
 	}
 
-	if (s->type & SIP_TRANSPORT_UDP) {
+	if (s->type == SIP_TRANSPORT_UDP) {
 		s->fd = sipsock;
 		return s->fd;
 	}
 
-	ca.remote_address = *(sip_real_dst(p));
-
-	if ((tcptls_session = sip_tcp_locate(&ca.remote_address))) {	/* Check if we have a thread handling a socket connected to this IP/port */
+	/* At this point we are dealing with a TCP/TLS connection
+	 * 1. We need to check to see if a connectin thread exists
+	 *    for this address, if so use that.
+	 * 2. If a thread does not exist for this address, but the tcptls_session
+	 *    exists on the socket, the connect must have been closed. Don't attempt
+	 *    to reopen, dialog is dead.
+	 * 3. If no tcptls_session thread exists for the address, and no tcptls_session
+	 *    already exists on the socket, create a new one and launch a new thread.
+	 */
+
+	/* 1.  check for existing threads */
+	tmp_ca.remote_address = *(sip_real_dst(p));
+	if ((tcptls_session = sip_tcp_locate(&tmp_ca.remote_address))) {
 		s->fd = tcptls_session->fd;
 		if (s->tcptls_session) {
 			ao2_ref(s->tcptls_session, -1);
@@ -22822,58 +22889,78 @@
 		}
 		s->tcptls_session = tcptls_session;
 		return s->fd;
-	}
-
-	if (s->tcptls_session && s->tcptls_session->parent->tls_cfg) {
-		ca.tls_cfg = s->tcptls_session->parent->tls_cfg;
-	} else {
-		if (s->type & SIP_TRANSPORT_TLS) {
-			ca.tls_cfg = ast_calloc(1, sizeof(*ca.tls_cfg));
-			if (!ca.tls_cfg)
-				return -1;
-			memcpy(ca.tls_cfg, &default_tls_cfg, sizeof(*ca.tls_cfg));
-			if (!ast_strlen_zero(p->tohost))
-				ast_copy_string(ca.hostname, p->tohost, sizeof(ca.hostname));
-		}
-	}
-	
-	if (s->tcptls_session) {
-		/* the pvt socket already has a server instance ... */
-	} else {
-		s->tcptls_session = ast_tcptls_client_start(&ca); /* Start a client connection to this address */
-	}
-
-	if (!s->tcptls_session) {
-		if (ca.tls_cfg)
-			ast_free(ca.tls_cfg);
+	/* 2.  Thread not found, if tcptls_session already exists, it once had a thread and is now terminated */
+	} else if (s->tcptls_session) {
+		/* at this point, if the tcptls_session exists, but no thread was found
+		 * for it, that means a thread existed but has been closed. Currently no
+		 * attempt is made to reconnect, XXX whether reconnection is necessary
+		 * needs to be investigated */
 		return -1;
 	}
 
-	s->fd = ca.accept_fd;
+	/* 3.  Create a new TCP/TLS client connection */
+	/* create new session arguments for the client connection */
+	if (!(ca = ao2_alloc(sizeof(*ca), sip_tcptls_client_args_destructor))) {
+		goto create_tcptls_session_fail;
+	}
+
+	ca->name = ast_strdup(name);
+	ca->accept_fd = -1;
+	ca->remote_address = *(sip_real_dst(p));
+	/* if type is TLS, we need to create a tls cfg for this session arg */
+	if (s->type == SIP_TRANSPORT_TLS) {
+		ca->tls_cfg = ast_calloc(1, sizeof(*ca->tls_cfg));
+		if (!ca->tls_cfg) {
+			goto create_tcptls_session_fail;
+		}
+		memcpy(ca->tls_cfg, &default_tls_cfg, sizeof(*ca->tls_cfg));
+		/* this host is used as the common name in ssl/tls */
+		if (!ast_strlen_zero(p->tohost)) {
+			ast_copy_string(ca->hostname, p->tohost, sizeof(ca->hostname));
+		}
+	}
+
+	/* Create a client connection for address, this does not start the connection, just sets it up. */
+	if (!(s->tcptls_session = ast_tcptls_client_create(ca))) {
+		goto create_tcptls_session_fail;
+	}
+
+	s->fd = s->tcptls_session->fd;
 
 	/* client connections need to have the sip_threadinfo object created before
 	 * the thread is detached.  This ensures the alert_pipe is up before it will
 	 * be used.  Note that this function links the new threadinfo object into the
 	 * threadl container. */
 	if (!(th = sip_threadinfo_create(s->tcptls_session))) {
-		close(ca.accept_fd);
-		s->fd = ca.accept_fd = -1;
-		return -1;
+		goto create_tcptls_session_fail;
 	}
 
 	/* Give the new thread a reference */
 	ao2_ref(s->tcptls_session, +1);
 
-	if (ast_pthread_create_background(&ca.master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
-		ast_debug(1, "Unable to launch '%s'.", ca.name);
+	if (ast_pthread_create_background(&ca->master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
+		ast_debug(1, "Unable to launch '%s'.", ca->name);
+		ao2_ref(s->tcptls_session, -1); /* take away the thread ref we just gave it */
+		goto create_tcptls_session_fail;
+	}
+
+	return s->fd;
+
+create_tcptls_session_fail:
+	if (ca) {
+		ao2_t_ref(ca, -1, "failed to create client, getting rid of client tcptls_session arguments");
+	}
+	if (s->tcptls_session) {
+		close(tcptls_session->fd);
+		s->fd = tcptls_session->fd = -1;
+		ao2_ref(s->tcptls_session, -1);
+		s->tcptls_session = NULL;
+	}
+	if (th) {
 		ao2_t_unlink(threadl, th, "Removing tcptls thread info object, thread failed to open");
-		ao2_ref(s->tcptls_session, -1);
-		close(ca.accept_fd);
-		s->fd = ca.accept_fd = -1;
-		return -1;
-	}
-
-	return s->fd;
+	}
+
+	return -1;
 }
 
 /*!

Modified: team/dvossel/sip_nonblocking_tcp_client/include/asterisk/tcptls.h
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/include/asterisk/tcptls.h?view=diff&rev=220533&r1=220532&r2=220533
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/include/asterisk/tcptls.h (original)
+++ team/dvossel/sip_nonblocking_tcp_client/include/asterisk/tcptls.h Fri Sep 25 12:55:35 2009
@@ -161,7 +161,9 @@
  * and starts a thread for handling accept()
  * \version 1.6.1 changed desc parameter to be of ast_tcptls_session_args type
  */
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc);
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session);
+
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc);
 
 void *ast_tcptls_server_root(void *);
 

Modified: team/dvossel/sip_nonblocking_tcp_client/main/tcptls.c
URL: http://svnview.digium.com/svn/asterisk/team/dvossel/sip_nonblocking_tcp_client/main/tcptls.c?view=diff&rev=220533&r1=220532&r2=220533
==============================================================================
--- team/dvossel/sip_nonblocking_tcp_client/main/tcptls.c (original)
+++ team/dvossel/sip_nonblocking_tcp_client/main/tcptls.c Fri Sep 25 12:55:35 2009
@@ -197,6 +197,7 @@
 						ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname);
 						if (peer)
 							X509_free(peer);
+						close(tcptls_session->fd);
 						fclose(tcptls_session->f);
 						ao2_ref(tcptls_session, -1);
 						return NULL;
@@ -357,7 +358,15 @@
 	return __ssl_setup(cfg, 0);
 }
 
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc)
+/*! brief\ attempts to connect and start tcptls session, on error the tcptls_session's
+ * ref count is decremented, fd and file are closed, and NULL is returned.
+ */
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session)
+{
+	return handle_tls_connection(tcptls_session);
+}
+
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc)
 {
 	int flags;
 	int x = 1;
@@ -421,11 +430,6 @@
 		desc->tls_cfg->enabled = 1;
 		__ssl_setup(desc->tls_cfg, 1);
 	}
-
-	/* handle_tls_connection controls the single ref to tcptls_session. If
-	 * tcptls_session returns NULL then the session has been destroyed */
-	if (!(tcptls_session = handle_tls_connection(tcptls_session)))
-		goto error;
 
 	return tcptls_session;
 




More information about the asterisk-commits mailing list