[asterisk-commits] dvossel: trunk r225445 - in /trunk: apps/ channels/ include/asterisk/ main/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Oct 22 14:56:01 CDT 2009


Author: dvossel
Date: Thu Oct 22 14:55:51 2009
New Revision: 225445

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=225445
Log:
SIP TCP/TLS: move client connection setup/write into tcp helper thread, various related locking/memory fixes.

        What this patch fixes
1.Moves sip TCP/TLS connection setup into the TCP helper thread:
  Connection setup takes awhile and before this it was being
  done while holding the monitor lock.
2.Moves TCP/TLS writing to the TCP helper thread:  Through the
  use of a packet queue and an alert pipe, the TCP helper thread
  can now be woken up to write data as well as read data.
3.Locking error: sip_xmit returned an XMIT_ERROR without giving
  up the tcptls_session lock.  This lock has been completely removed
  from sip_xmit and placed in the new sip_tcptls_write() function.
4.Memory leak:  When creating a tcptls_client the tls_cfg was alloced
  but never freed unless the tcptls_session failed to start.  Now the
  session_args for a sip client are an ao2 object which frees the
  tls_cfg on destruction.
5.Pointer to stack variable: During sip_prepare_socket the creation
  of a client's ast_tcptls_session_args was done on the stack and
  stored as a pointer in the newly created tcptls_session.  Depending
  on the events that followed, there was a slight possibility that
  pointer could have been accessed after the stack returned.  Given
  the new changes, it is always accessed after the stack returns
  which is why I found it.

Notable code changes
1.I broke tcptls.c's ast_tcptls_client_start() function into two
  functions.  One for creating and allocating the new tcptls_session,
  and a separate one for starting and handling the new connection.
  This allowed me to create the tcptls_session, launch the helper
  thread, and then establish the connection within the helper thread.
2.Writes to a tcptls_session are now done within the helper thread.
  This is done by using an alert pipe to wake up the thread if new
  data needs to be sent.  The thread's sip_threadinfo object contains
  the alert pipe as well as the packet queue.
3.Since the threadinfo object contains the alert pipe, it must now be
  accessed outside of the helper thread for every write (queuing of a
  packet).  For easy lookup, I moved the threadinfo objects from a
  linked list to an ao2_container.

(closes issue #13136)
Reported by: pabelanger
Tested by: dvossel, whys

(closes issue #15894)
Reported by: dvossel
Tested by: dvossel

Review: https://reviewboard.asterisk.org/r/380/


Modified:
    trunk/apps/app_externalivr.c
    trunk/channels/chan_sip.c
    trunk/include/asterisk/tcptls.h
    trunk/main/tcptls.c

Modified: trunk/apps/app_externalivr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_externalivr.c?view=diff&rev=225445&r1=225444&r2=225445
==============================================================================
--- trunk/apps/app_externalivr.c (original)
+++ trunk/apps/app_externalivr.c Thu Oct 22 14:55:51 2009
@@ -457,9 +457,7 @@
 		ivr_desc.local_address.sin_family = AF_INET;
 		ivr_desc.local_address.sin_port = htons(port);
 		memcpy(&ivr_desc.local_address.sin_addr.s_addr, hp.hp.h_addr, hp.hp.h_length);
-		ser = ast_tcptls_client_start(&ivr_desc);
-
-		if (!ser) {
+		if (!(ser = ast_tcptls_client_create(&ivr_desc)) || !(ser = ast_tcptls_client_start(ser))) {
 			goto exit;
 		}
 		res = eivr_comm(chan, u, ser->fd, ser->fd, -1, pipe_delim_args, flags);

Modified: trunk/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_sip.c?view=diff&rev=225445&r1=225444&r2=225445
==============================================================================
--- trunk/channels/chan_sip.c (original)
+++ trunk/channels/chan_sip.c Thu Oct 22 14:55:51 2009
@@ -2111,13 +2111,26 @@
 	char lastmsg[256];		/*!< Last Message sent/received */
 };
 
+enum sip_tcptls_alert {
+	/*! \brief There is new data to be sent out */
+	TCPTLS_ALERT_DATA,
+	/*! \brief A request to stop the tcp_handler thread */
+	TCPTLS_ALERT_STOP,
+};
+
+struct tcptls_packet {
+	AST_LIST_ENTRY(tcptls_packet) entry;
+	struct ast_str *data;
+	size_t len;
+};
 /*! \brief Definition of a thread that handles a socket */
 struct sip_threadinfo {
 	int stop;
+	int alert_pipe[2]; /*! Used to alert tcptls thread when packet is ready to be written */
 	pthread_t threadid;
 	struct ast_tcptls_session_instance *tcptls_session;
 	enum sip_transport type;	/*!< We keep a copy of the type here so we can display it in the connection list */
-	AST_LIST_ENTRY(sip_threadinfo) list;
+	AST_LIST_HEAD_NOLOCK(, tcptls_packet) packet_q;
 };
 
 /*! \brief Definition of an MWI subscription to another server */
@@ -2151,8 +2164,8 @@
 static int hash_user_size = 563;
 #endif
 
-/*! \brief  The thread list of TCP threads */
-static AST_LIST_HEAD_STATIC(threadl, sip_threadinfo);
+/*! \brief  The table of TCP threads */
+static struct ao2_container *threadt;
 
 /*! \brief  The peer list: Users, Peers and Friends */
 static struct ao2_container *peers;
@@ -2242,6 +2255,21 @@
 
 	/* Now only return a match if the port matches, as well. */
 	return peer->addr.sin_port == peer2->addr.sin_port ? (CMP_MATCH | CMP_STOP) : 0;
+}
+
+
+static int threadt_hash_cb(const void *obj, const int flags)
+{
+	const struct sip_threadinfo *th = obj;
+
+	return (int) th->tcptls_session->remote_address.sin_addr.s_addr;
+}
+
+static int threadt_cmp_cb(void *obj, void *arg, int flags)
+{
+	struct sip_threadinfo *th = obj, *th2 = arg;
+
+	return (th->tcptls_session == th2->tcptls_session) ? CMP_MATCH | CMP_STOP : 0;
 }
 
 /*!
@@ -2890,6 +2918,130 @@
 	return res;
 }
 
+static void tcptls_packet_destructor(void *obj)
+{
+	struct tcptls_packet *packet = obj;
+
+	ast_free(packet->data);
+}
+
+static void sip_tcptls_client_args_destructor(void *obj)
+{
+	struct ast_tcptls_session_args *args = obj;
+	if (args->tls_cfg) {
+		ast_free(args->tls_cfg->certfile);
+		ast_free(args->tls_cfg->pvtfile);
+		ast_free(args->tls_cfg->cipher);
+		ast_free(args->tls_cfg->cafile);
+		ast_free(args->tls_cfg->capath);
+	}
+	ast_free(args->tls_cfg);
+	ast_free((char *) args->name);
+}
+
+static void sip_threadinfo_destructor(void *obj)
+{
+	struct sip_threadinfo *th = obj;
+	struct tcptls_packet *packet;
+	if (th->alert_pipe[1] > -1) {
+		close(th->alert_pipe[0]);
+	}
+	if (th->alert_pipe[1] > -1) {
+		close(th->alert_pipe[1]);
+	}
+	th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+	while ((packet = AST_LIST_REMOVE_HEAD(&th->packet_q, entry))) {
+		ao2_t_ref(packet, -1, "thread destruction, removing packet from frame queue");
+	}
+
+	if (th->tcptls_session) {
+		ao2_t_ref(th->tcptls_session, -1, "remove tcptls_session for sip_threadinfo object");
+	}
+}
+
+/*! \brief creates a sip_threadinfo object and links it into the threadt table. */
+static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session, int transport)
+{
+	struct sip_threadinfo *th;
+
+	if (!tcptls_session || !(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) {
+		return NULL;
+	}
+
+	th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+	if (pipe(th->alert_pipe) == -1) {
+		ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
+		ast_log(LOG_ERROR, "Could not create sip alert pipe in tcptls thread, error %s\n", strerror(errno));
+		return NULL;
+	}
+	ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object");
+	th->tcptls_session = tcptls_session;
+	th->type = transport ? transport : (tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP);
+	ao2_t_link(threadt, th, "Adding new tcptls helper thread");
+	ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains");
+	return th;
+}
+
+/*! \brief used to indicate to a tcptls thread that data is ready to be written */
+static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len)
+{
+	int res = len;
+	struct sip_threadinfo *th = NULL;
+	struct tcptls_packet *packet = NULL;
+	struct sip_threadinfo tmp = {
+		.tcptls_session = tcptls_session,
+	};
+	enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
+
+	if (!tcptls_session) {
+		return XMIT_ERROR;
+	}
+
+	ast_mutex_lock(&tcptls_session->lock);
+
+	if ((tcptls_session->fd == -1) ||
+		!(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) ||
+		!(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) ||
+		!(packet->data = ast_str_create(len))) {
+		goto tcptls_write_setup_error;
+	}
+
+	/* goto tcptls_write_error should _NOT_ be used beyond this point */
+	ast_str_set(&packet->data, 0, "%s", (char *) buf);
+	packet->len = len;
+
+	/* alert tcptls thread handler that there is a packet to be sent.
+	 * must lock the thread info object to guarantee control of the
+	 * packet queue */
+	ao2_lock(th);
+	if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
+		ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
+		ao2_t_ref(packet, -1, "could not write to alert pipe, remove packet");
+		packet = NULL;
+		res = XMIT_ERROR;
+	} else { /* it is safe to queue the frame after issuing the alert when we hold the threadinfo lock */
+		AST_LIST_INSERT_TAIL(&th->packet_q, packet, entry);
+	}
+	ao2_unlock(th);
+
+	ast_mutex_unlock(&tcptls_session->lock);
+	ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
+	return res;
+
+tcptls_write_setup_error:
+	if (th) {
+		ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
+	}
+	if (packet) {
+		ao2_t_ref(packet, -1, "could not allocate packet's data");
+	}
+	ast_mutex_unlock(&tcptls_session->lock);
+
+	return XMIT_ERROR;
+}
+
 /*! \brief SIP TCP connection handler */
 static void *sip_tcp_worker_fn(void *data)
 {
@@ -2905,26 +3057,45 @@
 {
 	int res, cl;
 	struct sip_request req = { 0, } , reqcpy = { 0, };
-	struct sip_threadinfo *me;
+	struct sip_threadinfo *me = NULL;
 	char buf[1024] = "";
-
-	me = ast_calloc(1, sizeof(*me));
-
-	if (!me)
-		goto cleanup2;
+	struct pollfd fds[2] = { { 0 }, { 0 }, };
+	struct ast_tcptls_session_args *ca = NULL;
+
+	/* If this is a server session, then the connection has already been setup,
+	 * simply create the threadinfo object so we can access this thread for writing.
+	 * 
+	 * if this is a client connection more work must be done.
+	 * 1. We own the parent session args for a client connection.  This pointer needs
+	 *    to be held on to so we can decrement it's ref count on thread destruction.
+	 * 2. The threadinfo object was created before this thread was launched, however
+	 *    it must be found within the threadt table.
+	 * 3. Last, the tcptls_session must be started.
+	 */
+	if (!tcptls_session->client) {
+		if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? SIP_TRANSPORT_TLS : SIP_TRANSPORT_TCP))) {
+			goto cleanup;
+		}
+		ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread");
+	} else {
+		struct sip_threadinfo tmp = {
+			.tcptls_session = tcptls_session,
+		};
+
+		if ((!(ca = tcptls_session->parent)) ||
+			(!(me = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) ||
+			(!(tcptls_session = ast_tcptls_client_start(tcptls_session)))) {
+			goto cleanup;
+		}
+	}
 
 	me->threadid = pthread_self();
-	me->tcptls_session = tcptls_session;
-	if (tcptls_session->ssl)
-		me->type = SIP_TRANSPORT_TLS;
-	else
-		me->type = SIP_TRANSPORT_TCP;
-
 	ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
-	AST_LIST_LOCK(&threadl);
-	AST_LIST_INSERT_TAIL(&threadl, me, list);
-	AST_LIST_UNLOCK(&threadl);
+	/* set up pollfd to watch for reads on both the socket and the alert_pipe */
+	fds[0].fd = tcptls_session->fd;
+	fds[1].fd = me->alert_pipe[0];
+	fds[0].events = fds[1].events = POLLIN | POLLPRI;
 
 	if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
 		goto cleanup;
@@ -2934,81 +3105,120 @@
 	for (;;) {
 		struct ast_str *str_save;
 
-		str_save = req.data;
-		memset(&req, 0, sizeof(req));
-		req.data = str_save;
-		ast_str_reset(req.data);
-
-		str_save = reqcpy.data;
-		memset(&reqcpy, 0, sizeof(reqcpy));
-		reqcpy.data = str_save;
-		ast_str_reset(reqcpy.data);
-
-		memset(buf, 0, sizeof(buf));
-
-		if (tcptls_session->ssl) {
-			set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
-			req.socket.port = htons(ourport_tls);
-		} else {
-			set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
-			req.socket.port = htons(ourport_tcp);
-		}
-		req.socket.fd = tcptls_session->fd;
-		res = ast_wait_for_input(tcptls_session->fd, -1);
+		res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */
 		if (res < 0) {
 			ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res);
 			goto cleanup;
 		}
 
-		/* Read in headers one line at a time */
-		while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
-			ast_mutex_lock(&tcptls_session->lock);
-			if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
-				ast_mutex_unlock(&tcptls_session->lock);
-				goto cleanup;
+		/* handle the socket event, check for both reads from the socket fd,
+		 * and writes from alert_pipe fd */
+		if (fds[0].revents) { /* there is data on the socket to be read */
+
+			fds[0].revents = 0;
+
+			/* clear request structure */
+			str_save = req.data;
+			memset(&req, 0, sizeof(req));
+			req.data = str_save;
+			ast_str_reset(req.data);
+
+			str_save = reqcpy.data;
+			memset(&reqcpy, 0, sizeof(reqcpy));
+			reqcpy.data = str_save;
+			ast_str_reset(reqcpy.data);
+
+			memset(buf, 0, sizeof(buf));
+
+			if (tcptls_session->ssl) {
+				set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
+				req.socket.port = htons(ourport_tls);
+			} else {
+				set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
+				req.socket.port = htons(ourport_tcp);
 			}
-			ast_mutex_unlock(&tcptls_session->lock);
-			if (me->stop)
-				 goto cleanup;
-			ast_str_append(&req.data, 0, "%s", buf);
-			req.len = req.data->used;
-		}
-		copy_request(&reqcpy, &req);
-		parse_request(&reqcpy);
-		/* In order to know how much to read, we need the content-length header */
-		if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
-			while (cl > 0) {
-				size_t bytes_read;
+			req.socket.fd = tcptls_session->fd;
+
+			/* Read in headers one line at a time */
+			while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
 				ast_mutex_lock(&tcptls_session->lock);
-				if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+				if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
 					ast_mutex_unlock(&tcptls_session->lock);
 					goto cleanup;
 				}
-				buf[bytes_read] = '\0';
 				ast_mutex_unlock(&tcptls_session->lock);
 				if (me->stop)
-					goto cleanup;
-				cl -= strlen(buf);
+					 goto cleanup;
 				ast_str_append(&req.data, 0, "%s", buf);
 				req.len = req.data->used;
 			}
-		}
-		/*! \todo XXX If there's no Content-Length or if the content-length and what
-				we receive is not the same - we should generate an error */
-
-		req.socket.tcptls_session = tcptls_session;
-		handle_request_do(&req, &tcptls_session->remote_address);
-	}
+			copy_request(&reqcpy, &req);
+			parse_request(&reqcpy);
+			/* In order to know how much to read, we need the content-length header */
+			if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
+				while (cl > 0) {
+					size_t bytes_read;
+					ast_mutex_lock(&tcptls_session->lock);
+					if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+						ast_mutex_unlock(&tcptls_session->lock);
+						goto cleanup;
+					}
+					buf[bytes_read] = '\0';
+					ast_mutex_unlock(&tcptls_session->lock);
+					if (me->stop)
+						goto cleanup;
+					cl -= strlen(buf);
+					ast_str_append(&req.data, 0, "%s", buf);
+					req.len = req.data->used;
+				}
+			}
+			/*! \todo XXX If there's no Content-Length or if the content-length and what
+					we receive is not the same - we should generate an error */
+
+			req.socket.tcptls_session = tcptls_session;
+			handle_request_do(&req, &tcptls_session->remote_address);
+		}
+
+		if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */
+			enum sip_tcptls_alert alert;
+			struct tcptls_packet *packet;
+
+			fds[1].revents = 0;
+
+			if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) {
+				ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno));
+				continue;
+			}
+
+			switch (alert) {
+			case TCPTLS_ALERT_STOP:
+				goto cleanup;
+			case TCPTLS_ALERT_DATA:
+				ao2_lock(me);
+				if (!(packet = AST_LIST_REMOVE_HEAD(&me->packet_q, entry))) {
+					ast_log(LOG_WARNING, "TCPTLS thread alert_pipe indicated packet should be sent, but frame_q is empty");
+				} else if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) {
+					ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n");
+				}
+
+				if (packet) {
+					ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed");
+				}
+				ao2_unlock(me);
+				break;
+			default:
+				ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
+			}
+		}
+	}
+
+	ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
 cleanup:
-	AST_LIST_LOCK(&threadl);
-	AST_LIST_REMOVE(&threadl, me, list);
-	AST_LIST_UNLOCK(&threadl);
-	ast_free(me);
-cleanup2:
-	fclose(tcptls_session->f);
-	tcptls_session->f = NULL;
-	tcptls_session->fd = -1;
+	if (me) {
+		ao2_t_unlink(threadt, me, "Removing tcptls helper thread, thread is closing");
+		ao2_t_ref(me, -1, "Removing tcp_helper_threads threadinfo ref");
+	}
 	if (reqcpy.data) {
 		ast_free(reqcpy.data);
 	}
@@ -3018,12 +3228,27 @@
 		req.data = NULL;
 	}
 
-	ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
-	
-
-	ao2_ref(tcptls_session, -1);
-	tcptls_session = NULL;
-
+	/* if client, we own the parent session arguments and must decrement ref */
+	if (ca) {
+		ao2_t_ref(ca, -1, "closing tcptls thread, getting rid of client tcptls_session arguments");
+	}
+
+	if (tcptls_session) {
+		ast_mutex_lock(&tcptls_session->lock);
+		if (tcptls_session->f) {
+			fclose(tcptls_session->f);
+			tcptls_session->f = NULL;
+		}
+		if (tcptls_session->fd != -1) {
+			close(tcptls_session->fd);
+			tcptls_session->fd = -1;
+		}
+		tcptls_session->parent = NULL;
+		ast_mutex_unlock(&tcptls_session->lock);
+
+		ao2_ref(tcptls_session, -1);
+		tcptls_session = NULL;
+	}
 	return NULL;
 }
 
@@ -3480,25 +3705,14 @@
 	if (sip_prepare_socket(p) < 0)
 		return XMIT_ERROR;
 
-	if (p->socket.tcptls_session)
-		ast_mutex_lock(&p->socket.tcptls_session->lock);
-
-	if (p->socket.type & SIP_TRANSPORT_UDP) {
+	if (p->socket.type == SIP_TRANSPORT_UDP) {
 		res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
 	} else if (p->socket.tcptls_session) {
-		if (p->socket.tcptls_session->f) {
-			res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len);
-		} else {
-			ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
-			return XMIT_ERROR;
-		}
+		res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
 	} else {
 		ast_debug(2, "Socket type is TCP but no tcptls_session is present to write to\n");
 		return XMIT_ERROR;
 	}
-
-	if (p->socket.tcptls_session)
-		ast_mutex_unlock(&p->socket.tcptls_session->lock);
 
 	if (res == -1) {
 		switch (errno) {
@@ -12233,6 +12447,11 @@
 	destroy_association(peer);	/* remove registration data from storage */
 	set_socket_transport(&peer->socket, peer->default_outbound_transport);
 
+	if (peer->socket.tcptls_session) {
+		ao2_ref(peer->socket.tcptls_session, -1);
+		peer->socket.tcptls_session = NULL;
+	}
+
 	manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: SIP\r\nPeer: SIP/%s\r\nPeerStatus: Unregistered\r\nCause: Expired\r\n", peer->name);
 	register_peer_exten(peer, FALSE);	/* Remove regexten */
 	ast_devstate_changed(AST_DEVICE_UNKNOWN, "SIP/%s", peer->name);
@@ -14732,6 +14951,7 @@
 static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	struct sip_threadinfo *th;
+	struct ao2_iterator i;
 
 #define FORMAT2 "%-30.30s %3.6s %9.9s %6.6s\n"
 #define FORMAT  "%-30.30s %-6d %-9.9s %-6.6s\n"
@@ -14751,15 +14971,16 @@
 		return CLI_SHOWUSAGE;
 
 	ast_cli(a->fd, FORMAT2, "Host", "Port", "Transport", "Type");
-	AST_LIST_LOCK(&threadl);
-	AST_LIST_TRAVERSE(&threadl, th, list) {
+
+	i = ao2_iterator_init(threadt, 0);
+	while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
 		ast_cli(a->fd, FORMAT, ast_inet_ntoa(th->tcptls_session->remote_address.sin_addr),
 			ntohs(th->tcptls_session->remote_address.sin_port),
 			get_transport(th->type),
 			(th->tcptls_session->client ? "Client" : "Server"));
-
-	}
-	AST_LIST_UNLOCK(&threadl);
+		ao2_t_ref(th, -1, "decrement ref from iterator");
+	}
+
 	return CLI_SUCCESS;
 #undef FORMAT
 #undef FORMAT2
@@ -22678,6 +22899,18 @@
 		return port == STANDARD_SIP_PORT;
 }
 
+static int threadinfo_locate_cb(void *obj, void *arg, int flags)
+{
+	struct sip_threadinfo *th = obj;
+	struct sockaddr_in *s = arg;
+
+	if (!inaddrcmp(&th->tcptls_session->remote_address, s)) {
+		return CMP_MATCH | CMP_STOP;
+	}
+
+	return 0;
+}
+
 /*!
  * \brief Find thread for TCP/TLS session (based on IP/Port
  *
@@ -22688,16 +22921,10 @@
 	struct sip_threadinfo *th;
 	struct ast_tcptls_session_instance *tcptls_instance = NULL;
 
-	AST_LIST_LOCK(&threadl);
-	AST_LIST_TRAVERSE(&threadl, th, list) {
-		if ((s->sin_family == th->tcptls_session->remote_address.sin_family) &&
-			(s->sin_addr.s_addr == th->tcptls_session->remote_address.sin_addr.s_addr) &&
-			(s->sin_port == th->tcptls_session->remote_address.sin_port))  {
-				tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
-				break;
-			}
-	}
-	AST_LIST_UNLOCK(&threadl);
+	if ((th = ao2_callback(threadt, 0, threadinfo_locate_cb, s))) {
+		tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
+		ao2_t_ref(th, -1, "decrement ref from callback");
+	}
 
 	return tcptls_instance;
 }
@@ -22707,14 +22934,23 @@
 {
 	struct sip_socket *s = &p->socket;
 	static const char name[] = "SIP socket";
+	struct sip_threadinfo *th;
 	struct ast_tcptls_session_instance *tcptls_session;
-	struct ast_tcptls_session_args ca = {
+	struct ast_tcptls_session_args tmp_ca = {
 		.name = name,
 		.accept_fd = -1,
 	};
-
-	if (s->fd != -1)
-		return s->fd;	/* This socket is already active */
+	struct ast_tcptls_session_args *ca;
+
+	/* check to see if a socket is already active */
+	if ((s->fd != -1) && (s->type == SIP_TRANSPORT_UDP)) {
+		return s->fd;
+	}
+	if ((s->type & (SIP_TRANSPORT_TCP | SIP_TRANSPORT_TLS)) &&
+			(s->tcptls_session) &&
+			(s->tcptls_session->fd != -1)) {
+		return s->tcptls_session->fd;
+	}
 
 	/*! \todo Check this... This might be wrong, depending on the proxy configuration
 		If proxy is in "force" mode its correct.
@@ -22723,14 +22959,23 @@
 		s->type = p->outboundproxy->transport;
 	}
 
-	if (s->type & SIP_TRANSPORT_UDP) {
+	if (s->type == SIP_TRANSPORT_UDP) {
 		s->fd = sipsock;
 		return s->fd;
 	}
 
-	ca.remote_address = *(sip_real_dst(p));
-
-	if ((tcptls_session = sip_tcp_locate(&ca.remote_address))) {	/* Check if we have a thread handling a socket connected to this IP/port */
+	/* At this point we are dealing with a TCP/TLS connection
+	 * 1. We need to check to see if a connectin thread exists
+	 *    for this address, if so use that.
+	 * 2. If a thread does not exist for this address, but the tcptls_session
+	 *    exists on the socket, the connection was closed.
+	 * 3. If no tcptls_session thread exists for the address, and no tcptls_session
+	 *    already exists on the socket, create a new one and launch a new thread.
+	 */
+
+	/* 1.  check for existing threads */
+	tmp_ca.remote_address = *(sip_real_dst(p));
+	if ((tcptls_session = sip_tcp_locate(&tmp_ca.remote_address))) {
 		s->fd = tcptls_session->fd;
 		if (s->tcptls_session) {
 			ao2_ref(s->tcptls_session, -1);
@@ -22738,46 +22983,82 @@
 		}
 		s->tcptls_session = tcptls_session;
 		return s->fd;
-	}
-
-	if (s->tcptls_session && s->tcptls_session->parent->tls_cfg) {
-		ca.tls_cfg = s->tcptls_session->parent->tls_cfg;
-	} else {
-		if (s->type & SIP_TRANSPORT_TLS) {
-			ca.tls_cfg = ast_calloc(1, sizeof(*ca.tls_cfg));
-			if (!ca.tls_cfg)
-				return -1;
-			memcpy(ca.tls_cfg, &default_tls_cfg, sizeof(*ca.tls_cfg));
-			if (!ast_strlen_zero(p->tohost))
-				ast_copy_string(ca.hostname, p->tohost, sizeof(ca.hostname));
-		}
-	}
-	
+	/* 2.  Thread not found, if tcptls_session already exists, it once had a thread and is now terminated */
+	} else if (s->tcptls_session) {
+		return s->fd; /* XXX whether reconnection is ever necessary here needs to be investigated further */
+	}
+
+	/* 3.  Create a new TCP/TLS client connection */
+	/* create new session arguments for the client connection */
+	if (!(ca = ao2_alloc(sizeof(*ca), sip_tcptls_client_args_destructor)) ||
+		!(ca->name = ast_strdup(name))) {
+		goto create_tcptls_session_fail;
+	}
+	ca->accept_fd = -1;
+	ca->remote_address = *(sip_real_dst(p));
+	/* if type is TLS, we need to create a tls cfg for this session arg */
+	if (s->type == SIP_TRANSPORT_TLS) {
+		if (!(ca->tls_cfg = ast_calloc(1, sizeof(*ca->tls_cfg)))) {
+			goto create_tcptls_session_fail;
+		}
+		memcpy(ca->tls_cfg, &default_tls_cfg, sizeof(*ca->tls_cfg));
+
+		if (!(ca->tls_cfg->certfile = ast_strdup(default_tls_cfg.certfile)) ||
+			!(ca->tls_cfg->pvtfile = ast_strdup(default_tls_cfg.pvtfile)) ||
+			!(ca->tls_cfg->cipher = ast_strdup(default_tls_cfg.cipher)) ||
+			!(ca->tls_cfg->cafile = ast_strdup(default_tls_cfg.cafile)) ||
+			!(ca->tls_cfg->capath = ast_strdup(default_tls_cfg.capath))) {
+
+			goto create_tcptls_session_fail;
+		}
+
+		/* this host is used as the common name in ssl/tls */
+		if (!ast_strlen_zero(p->tohost)) {
+			ast_copy_string(ca->hostname, p->tohost, sizeof(ca->hostname));
+		}
+	}
+
+	/* Create a client connection for address, this does not start the connection, just sets it up. */
+	if (!(s->tcptls_session = ast_tcptls_client_create(ca))) {
+		goto create_tcptls_session_fail;
+	}
+
+	s->fd = s->tcptls_session->fd;
+
+	/* client connections need to have the sip_threadinfo object created before
+	 * the thread is detached.  This ensures the alert_pipe is up before it will
+	 * be used.  Note that this function links the new threadinfo object into the
+	 * threadt container. */
+	if (!(th = sip_threadinfo_create(s->tcptls_session, s->type))) {
+		goto create_tcptls_session_fail;
+	}
+
+	/* Give the new thread a reference to the tcptls_session */
+	ao2_ref(s->tcptls_session, +1);
+
+	if (ast_pthread_create_background(&ca->master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
+		ast_debug(1, "Unable to launch '%s'.", ca->name);
+		ao2_ref(s->tcptls_session, -1); /* take away the thread ref we just gave it */
+		goto create_tcptls_session_fail;
+	}
+
+	return s->fd;
+
+create_tcptls_session_fail:
+	if (ca) {
+		ao2_t_ref(ca, -1, "failed to create client, getting rid of client tcptls_session arguments");
+	}
 	if (s->tcptls_session) {
-		/* the pvt socket already has a server instance ... */
-	} else {
-		s->tcptls_session = ast_tcptls_client_start(&ca); /* Start a client connection to this address */
-	}
-
-	if (!s->tcptls_session) {
-		if (ca.tls_cfg)
-			ast_free(ca.tls_cfg);
-		return -1;
-	}
-
-	s->fd = ca.accept_fd;
-
-	/* Give the new thread a reference */
-	ao2_ref(s->tcptls_session, +1);
-
-	if (ast_pthread_create_background(&ca.master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
-		ast_debug(1, "Unable to launch '%s'.", ca.name);
+		close(tcptls_session->fd);
+		s->fd = tcptls_session->fd = -1;
 		ao2_ref(s->tcptls_session, -1);
-		close(ca.accept_fd);
-		s->fd = ca.accept_fd = -1;
-	}
-
-	return s->fd;
+		s->tcptls_session = NULL;
+	}
+	if (th) {
+		ao2_t_unlink(threadt, th, "Removing tcptls thread info object, thread failed to open");
+	}
+
+	return -1;
 }
 
 /*!
@@ -26362,6 +26643,7 @@
 	peers = ao2_t_container_alloc(hash_peer_size, peer_hash_cb, peer_cmp_cb, "allocate peers");
 	peers_by_ip = ao2_t_container_alloc(hash_peer_size, peer_iphash_cb, peer_ipcmp_cb, "allocate peers_by_ip");
 	dialogs = ao2_t_container_alloc(hash_dialog_size, dialog_hash_cb, dialog_cmp_cb, "allocate dialogs");
+	threadt = ao2_t_container_alloc(hash_dialog_size, threadt_hash_cb, threadt_cmp_cb, "allocate threadt table");
 	
 	ASTOBJ_CONTAINER_INIT(&regl); /* Registry object list -- not searched for anything */
 	ASTOBJ_CONTAINER_INIT(&submwil); /* MWI subscription object list */
@@ -26492,17 +26774,15 @@
 		ast_tcptls_server_stop(&sip_tls_desc);
 
 	/* Kill all existing TCP/TLS threads */
-	AST_LIST_LOCK(&threadl);
-	AST_LIST_TRAVERSE_SAFE_BEGIN(&threadl, th, list) {
+	i = ao2_iterator_init(threadt, 0);
+	while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
 		pthread_t thread = th->threadid;
 		th->stop = 1;
-		AST_LIST_UNLOCK(&threadl);
 		pthread_kill(thread, SIGURG);
 		pthread_join(thread, NULL);
-		AST_LIST_LOCK(&threadl);
-	}
-	AST_LIST_TRAVERSE_SAFE_END;
-	AST_LIST_UNLOCK(&threadl);
+		ao2_t_ref(th, -1, "decrement ref from iterator");
+	}
+	ao2_iterator_destroy(&i);
 
 	/* Hangup all dialogs if they have an owner */
 	i = ao2_iterator_init(dialogs, 0);
@@ -26555,6 +26835,7 @@
 	ao2_t_ref(peers, -1, "unref the peers table");
 	ao2_t_ref(peers_by_ip, -1, "unref the peers_by_ip table");
 	ao2_t_ref(dialogs, -1, "unref the dialogs table");
+	ao2_t_ref(threadt, -1, "unref the thread table");
 
 	clear_sip_domains();
 	close(sipsock);

Modified: trunk/include/asterisk/tcptls.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/tcptls.h?view=diff&rev=225445&r1=225444&r2=225445
==============================================================================
--- trunk/include/asterisk/tcptls.h (original)
+++ trunk/include/asterisk/tcptls.h Thu Oct 22 14:55:51 2009
@@ -156,12 +156,14 @@
 #define LEN_T size_t
 #endif
 
-/*!
- * \brief A generic client routine for a TCP client
- * and starts a thread for handling accept()
- * \version 1.6.1 changed desc parameter to be of ast_tcptls_session_args type
- */
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc);
+/*! 
+  * \brief attempts to connect and start tcptls session, on error the tcptls_session's
+  * ref count is decremented, fd and file are closed, and NULL is returned.
+  */
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session);
+
+/* \brief Creates a client connection's ast_tcptls_session_instance. */
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc);
 
 void *ast_tcptls_server_root(void *);
 

Modified: trunk/main/tcptls.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/tcptls.c?view=diff&rev=225445&r1=225444&r2=225445
==============================================================================
--- trunk/main/tcptls.c (original)
+++ trunk/main/tcptls.c Thu Oct 22 14:55:51 2009
@@ -125,7 +125,7 @@
 *
 * \note must decrement ref count before returning NULL on error
 */
-static void *handle_tls_connection(void *data)
+static void *handle_tcptls_connection(void *data)
 {
 	struct ast_tcptls_session_instance *tcptls_session = data;
 #ifdef DO_SSL
@@ -197,6 +197,7 @@
 						ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname);
 						if (peer)
 							X509_free(peer);
+						close(tcptls_session->fd);
 						fclose(tcptls_session->f);
 						ao2_ref(tcptls_session, -1);
 						return NULL;
@@ -266,7 +267,7 @@
 		tcptls_session->client = 0;
 
 		/* This thread is now the only place that controls the single ref to tcptls_session */
-		if (ast_pthread_create_detached_background(&launched, NULL, handle_tls_connection, tcptls_session)) {
+		if (ast_pthread_create_detached_background(&launched, NULL, handle_tcptls_connection, tcptls_session)) {
 			ast_log(LOG_WARNING, "Unable to launch helper thread: %s\n", strerror(errno));
 			close(tcptls_session->fd);
 			ao2_ref(tcptls_session, -1);
@@ -357,9 +358,45 @@
 	return __ssl_setup(cfg, 0);
 }
 
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc)
-{
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session)
+{
+	struct ast_tcptls_session_args *desc;
 	int flags;
+
+	if (!(desc = tcptls_session->parent)) {
+		goto client_start_error;
+	}
+
+	if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) {
+		ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n",
+			desc->name,
+			ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port),
+			strerror(errno));
+		goto client_start_error;
+	}
+
+	flags = fcntl(desc->accept_fd, F_GETFL);
+	fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK);
+
+	if (desc->tls_cfg) {
+		desc->tls_cfg->enabled = 1;
+		__ssl_setup(desc->tls_cfg, 1);
+	}
+
+	return handle_tcptls_connection(tcptls_session);
+
+client_start_error:
+	close(desc->accept_fd);
+	desc->accept_fd = -1;
+	if (tcptls_session) {
+		ao2_ref(tcptls_session, -1);
+	}
+	return NULL;
+
+}
+
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc)
+{
 	int x = 1;
 	struct ast_tcptls_session_instance *tcptls_session = NULL;
 
@@ -394,38 +431,15 @@
 		}
 	}
 
-	if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) {
-		ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n",
-			desc->name,
-			ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port),
-			strerror(errno));
-		goto error;
-	}
-
 	if (!(tcptls_session = ao2_alloc(sizeof(*tcptls_session), session_instance_destructor)))
 		goto error;
 
 	ast_mutex_init(&tcptls_session->lock);
-
-	flags = fcntl(desc->accept_fd, F_GETFL);
-	fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK);
-
+	tcptls_session->client = 1;
 	tcptls_session->fd = desc->accept_fd;
 	tcptls_session->parent = desc;
 	tcptls_session->parent->worker_fn = NULL;
 	memcpy(&tcptls_session->remote_address, &desc->remote_address, sizeof(tcptls_session->remote_address));
-
-	tcptls_session->client = 1;
-
-	if (desc->tls_cfg) {
-		desc->tls_cfg->enabled = 1;
-		__ssl_setup(desc->tls_cfg, 1);
-	}
-
-	/* handle_tls_connection controls the single ref to tcptls_session. If
-	 * tcptls_session returns NULL then the session has been destroyed */
-	if (!(tcptls_session = handle_tls_connection(tcptls_session)))
-		goto error;
 
 	return tcptls_session;
 




More information about the asterisk-commits mailing list