[asterisk-commits] file: branch 12 r423151 - in /branches/12: ./ res/res_rtp_asterisk.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Sep 16 06:09:28 CDT 2014


Author: file
Date: Tue Sep 16 06:09:25 2014
New Revision: 423151

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=423151
Log:
res_rtp_asterisk: Fix a myriad of TURN client issues.

1. The number of file descriptors an ioqueue instance can handle is fixed, so we
now spawn the required number to handle the load.
2. Our transport identifiers were exceeding the range supported by pjnath.
3. The TURN client did not set up client binding causing needless bandwidth usage.
4. The code no longer updates address information on each packet.
5. STUN traffic was getting looped back to Asterisk instead of going through the
TURN server.
6. Synchronization now ensures things are completely setup or destroyed.
7. Logging now reflects the target the TURN server is sending to/receiving from
on our behalf.

ASTERISK-23577 #close
Reported by: Jay Jideliov

ASTERISK-23634 #close
Reported by: Roman Skvirsky

Review: https://reviewboard.asterisk.org/r/3982/
........

Merged revisions 423150 from http://svn.asterisk.org/svn/asterisk/branches/11

Modified:
    branches/12/   (props changed)
    branches/12/res/res_rtp_asterisk.c

Propchange: branches/12/
------------------------------------------------------------------------------
Binary property 'branch-11-merged' - no diff available.

Modified: branches/12/res/res_rtp_asterisk.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_rtp_asterisk.c?view=diff&rev=423151&r1=423150&r2=423151
==============================================================================
--- branches/12/res/res_rtp_asterisk.c (original)
+++ branches/12/res/res_rtp_asterisk.c Tue Sep 16 06:09:25 2014
@@ -81,9 +81,9 @@
 #define MINIMUM_RTP_PORT 1024 /*!< Minimum port number to accept */
 #define MAXIMUM_RTP_PORT 65535 /*!< Maximum port number to accept */
 
-#define DEFAULT_TURN_PORT 34780
-
-#define TURN_ALLOCATION_WAIT_TIME 2000
+#define DEFAULT_TURN_PORT 3478
+
+#define TURN_STATE_WAIT_TIME 2000
 
 #define RTCP_PT_FUR     192
 #define RTCP_PT_SR      AST_RTP_RTCP_SR
@@ -147,20 +147,39 @@
 /*! \brief Pool factory used by pjlib to allocate memory. */
 static pj_caching_pool cachingpool;
 
-/*! \brief Pool used by pjlib functions which require memory allocation. */
+/*! \brief Global memory pool for configuration and timers */
 static pj_pool_t *pool;
 
-/*! \brief I/O queue for TURN relay traffic */
-static pj_ioqueue_t *ioqueue;
-
-/*! \brief Timer heap for ICE and TURN stuff */
-static pj_timer_heap_t *timerheap;
-
-/*! \brief Worker thread for ICE/TURN */
-static pj_thread_t *thread;
-
-/*! \brief Notification that the ICE/TURN worker thread should stop */
-static int worker_terminate;
+/*! \brief Global timer heap */
+static pj_timer_heap_t *timer_heap;
+
+/*! \brief Thread executing the timer heap */
+static pj_thread_t *timer_thread;
+
+/*! \brief Used to tell the timer thread to terminate */
+static int timer_terminate;
+
+/*! \brief Structure which contains ioqueue thread information */
+struct ast_rtp_ioqueue_thread {
+	/*! \brief Pool used by the thread */
+	pj_pool_t *pool;
+	/*! \brief The thread handling the queue and timer heap */
+	pj_thread_t *thread;
+	/*! \brief Ioqueue which polls on sockets */
+	pj_ioqueue_t *ioqueue;
+	/*! \brief Timer heap for scheduled items */
+	pj_timer_heap_t *timerheap;
+	/*! \brief Termination request */
+	int terminate;
+	/*! \brief Current number of descriptors being waited on */
+	unsigned int count;
+	/*! \brief Linked list information */
+	AST_LIST_ENTRY(ast_rtp_ioqueue_thread) next;
+};
+
+/*! \brief List of ioqueue threads */
+static AST_LIST_HEAD_STATIC(ioqueues, ast_rtp_ioqueue_thread);
+
 #endif
 
 #define FLAG_3389_WARNING               (1 << 0)
@@ -170,10 +189,10 @@
 #define FLAG_NEED_MARKER_BIT            (1 << 3)
 #define FLAG_DTMF_COMPENSATE            (1 << 4)
 
-#define TRANSPORT_SOCKET_RTP 1
-#define TRANSPORT_SOCKET_RTCP 2
-#define TRANSPORT_TURN_RTP 3
-#define TRANSPORT_TURN_RTCP 4
+#define TRANSPORT_SOCKET_RTP 0
+#define TRANSPORT_SOCKET_RTCP 1
+#define TRANSPORT_TURN_RTP 2
+#define TRANSPORT_TURN_RTCP 3
 
 /*! \brief RTP learning mode tracking information */
 struct rtp_learning_info {
@@ -274,7 +293,13 @@
 	pj_turn_sock *turn_rtcp;    /*!< RTCP TURN relay */
 	pj_turn_state_t turn_state; /*!< Current state of the TURN relay session */
 	unsigned int passthrough:1; /*!< Bit to indicate that the received packet should be passed through */
+	unsigned int rtp_passthrough:1; /*!< Bit to indicate that TURN RTP should be passed through */
+	unsigned int rtcp_passthrough:1; /*!< Bit to indicate that TURN RTCP should be passed through */
 	unsigned int ice_port;      /*!< Port that ICE was started with if it was previously started */
+	struct ast_sockaddr rtp_loop; /*!< Loopback address for forwarding RTP from TURN */
+	struct ast_sockaddr rtcp_loop; /*!< Loopback address for forwarding RTCP from TURN */
+
+	struct ast_rtp_ioqueue_thread *ioqueue; /*!< The ioqueue thread handling us */
 
 	char remote_ufrag[256];  /*!< The remote ICE username */
 	char remote_passwd[256]; /*!< The remote ICE password */
@@ -421,10 +446,11 @@
 
 static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp);
 
+#ifdef HAVE_PJPROJECT
 /*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */
-static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component, struct ast_sockaddr *cand_address)
-{
-#ifdef HAVE_PJPROJECT
+static void update_address_with_ice_candidate(struct ast_rtp *rtp, enum ast_rtp_ice_component_type component,
+	struct ast_sockaddr *cand_address)
+{
 	char address[PJ_INET6_ADDRSTRLEN];
 
 	if (!rtp->ice || (component < 1) || !rtp->ice->comp[component - 1].valid_check) {
@@ -433,10 +459,20 @@
 
 	ast_sockaddr_parse(cand_address, pj_sockaddr_print(&rtp->ice->comp[component - 1].valid_check->rcand->addr, address, sizeof(address), 0), 0);
 	ast_sockaddr_set_port(cand_address, pj_sockaddr_get_port(&rtp->ice->comp[component - 1].valid_check->rcand->addr));
-#endif
-}
-
-#ifdef HAVE_PJPROJECT
+}
+
+/*! \brief Helper function which sets up channel binding on a TURN session if applicable */
+static void turn_enable_bind_channel(struct ast_rtp *rtp, pj_turn_sock *turn, int component, int transport)
+{
+	if (!rtp->ice || !turn || (component < 1) || !rtp->ice->comp[component - 1].valid_check ||
+		(rtp->ice->comp[component - 1].valid_check->lcand->transport_id != transport)) {
+		return;
+	}
+
+	pj_turn_sock_bind_channel(turn, &rtp->ice->comp[component - 1].valid_check->rcand->addr,
+		sizeof(rtp->ice->comp[component - 1].valid_check->rcand->addr));
+}
+
 /*! \brief Destructor for locally created ICE candidates */
 static void ast_rtp_ice_candidate_destroy(void *obj)
 {
@@ -667,7 +703,7 @@
 	if (pj_ice_sess_create_check_list(rtp->ice, &ufrag, &passwd, ao2_container_count(rtp->ice_active_remote_candidates), &candidates[0]) == PJ_SUCCESS) {
 		ast_test_suite_event_notify("ICECHECKLISTCREATE", "Result: SUCCESS");
 		pj_ice_sess_start_check(rtp->ice);
-		pj_timer_heap_poll(timerheap, NULL);
+		pj_timer_heap_poll(timer_heap, NULL);
 		rtp->strict_rtp_state = STRICT_RTP_OPEN;
 		return;
 	}
@@ -791,6 +827,335 @@
 
 	ao2_link(rtp->ice_local_candidates, candidate);
 	ao2_ref(candidate, -1);
+}
+
+static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	pj_status_t status;
+
+	status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr,
+		addr_len);
+	if (status != PJ_SUCCESS) {
+		char buf[100];
+
+		pj_strerror(status, buf, sizeof(buf));
+		ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+			(int)status, buf);
+		return;
+	}
+	if (!rtp->rtp_passthrough) {
+		return;
+	}
+	rtp->rtp_passthrough = 0;
+
+	ast_sendto(rtp->s, pkt, pkt_len, 0, &rtp->rtp_loop);
+}
+
+static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+	struct ast_rtp *rtp;
+
+	/* If this is a leftover from an already notified RTP instance just ignore the state change */
+	if (!instance) {
+		return;
+	}
+
+	rtp = ast_rtp_instance_get_data(instance);
+
+	/* We store the new state so the other thread can actually handle it */
+	ast_mutex_lock(&rtp->lock);
+	rtp->turn_state = new_state;
+	ast_cond_signal(&rtp->cond);
+
+	if (new_state == PJ_TURN_STATE_DESTROYING) {
+		pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
+		rtp->turn_rtp = NULL;
+	}
+
+	ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
+	.on_rx_data = ast_rtp_on_turn_rx_rtp_data,
+	.on_state = ast_rtp_on_turn_rtp_state,
+};
+
+static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	pj_status_t status;
+
+	status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr,
+		addr_len);
+	if (status != PJ_SUCCESS) {
+		char buf[100];
+
+		pj_strerror(status, buf, sizeof(buf));
+		ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+			(int)status, buf);
+		return;
+	}
+	if (!rtp->rtcp_passthrough) {
+		return;
+	}
+	rtp->rtcp_passthrough = 0;
+
+	ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp_loop);
+}
+
+static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+	struct ast_rtp *rtp = NULL;
+
+	/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
+	if (!instance) {
+		return;
+	}
+
+	rtp = ast_rtp_instance_get_data(instance);
+
+	/* We store the new state so the other thread can actually handle it */
+	ast_mutex_lock(&rtp->lock);
+	rtp->turn_state = new_state;
+	ast_cond_signal(&rtp->cond);
+
+	if (new_state == PJ_TURN_STATE_DESTROYING) {
+		pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
+		rtp->turn_rtcp = NULL;
+	}
+
+	ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTCP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
+	.on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
+	.on_state = ast_rtp_on_turn_rtcp_state,
+};
+
+/*! \brief Worker thread for ioqueue and timerheap */
+static int ioqueue_worker_thread(void *data)
+{
+	struct ast_rtp_ioqueue_thread *ioqueue = data;
+
+	while (!ioqueue->terminate) {
+		const pj_time_val delay = {0, 10};
+
+		pj_ioqueue_poll(ioqueue->ioqueue, &delay);
+
+		pj_timer_heap_poll(ioqueue->timerheap, NULL);
+	}
+
+	return 0;
+}
+
+/*! \brief Destroyer for ioqueue thread */
+static void rtp_ioqueue_thread_destroy(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+	if (ioqueue->thread) {
+		ioqueue->terminate = 1;
+		pj_thread_join(ioqueue->thread);
+		pj_thread_destroy(ioqueue->thread);
+	}
+
+	pj_pool_release(ioqueue->pool);
+	ast_free(ioqueue);
+}
+
+/*! \brief Removal function for ioqueue thread, determines if it should be terminated and destroyed */
+static void rtp_ioqueue_thread_remove(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+	int destroy = 0;
+
+	/* If nothing is using this ioqueue thread destroy it */
+	AST_LIST_LOCK(&ioqueues);
+	if ((ioqueue->count - 2) == 0) {
+		destroy = 1;
+		AST_LIST_REMOVE(&ioqueues, ioqueue, next);
+	}
+	AST_LIST_UNLOCK(&ioqueues);
+
+	if (!destroy) {
+		return;
+	}
+
+	rtp_ioqueue_thread_destroy(ioqueue);
+}
+
+/*! \brief Finder and allocator for an ioqueue thread */
+static struct ast_rtp_ioqueue_thread *rtp_ioqueue_thread_get_or_create(void)
+{
+	struct ast_rtp_ioqueue_thread *ioqueue;
+	pj_lock_t *lock;
+
+	AST_LIST_LOCK(&ioqueues);
+
+	/* See if an ioqueue thread exists that can handle more */
+	AST_LIST_TRAVERSE(&ioqueues, ioqueue, next) {
+		if ((ioqueue->count + 2) < PJ_IOQUEUE_MAX_HANDLES) {
+			break;
+		}
+	}
+
+	/* If we found one bump it up and return it */
+	if (ioqueue) {
+		ioqueue->count += 2;
+		goto end;
+	}
+
+	ioqueue = ast_calloc(1, sizeof(*ioqueue));
+	if (!ioqueue) {
+		goto end;
+	}
+
+	ioqueue->pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
+
+	/* We use a timer on the ioqueue thread for TURN so that two threads aren't operating
+	 * on a session at the same time
+	 */
+	if (pj_timer_heap_create(ioqueue->pool, 4, &ioqueue->timerheap) != PJ_SUCCESS) {
+		goto fatal;
+	}
+
+	if (pj_lock_create_recursive_mutex(ioqueue->pool, "rtp%p", &lock) != PJ_SUCCESS) {
+		goto fatal;
+	}
+
+	pj_timer_heap_set_lock(ioqueue->timerheap, lock, PJ_TRUE);
+
+	if (pj_ioqueue_create(ioqueue->pool, 16, &ioqueue->ioqueue) != PJ_SUCCESS) {
+		goto fatal;
+	}
+
+	if (pj_thread_create(ioqueue->pool, "ice", &ioqueue_worker_thread, ioqueue, 0, 0, &ioqueue->thread) != PJ_SUCCESS) {
+		goto fatal;
+	}
+
+	AST_LIST_INSERT_HEAD(&ioqueues, ioqueue, next);
+
+	/* Since this is being returned to an active session the count always starts at 2 */
+	ioqueue->count = 2;
+
+	goto end;
+
+fatal:
+	rtp_ioqueue_thread_destroy(ioqueue);
+	ioqueue = NULL;
+
+end:
+	AST_LIST_UNLOCK(&ioqueues);
+	return ioqueue;
+}
+
+static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component,
+		enum ast_transport transport, const char *server, unsigned int port, const char *username, const char *password)
+{
+	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	pj_turn_sock **turn_sock;
+	const pj_turn_sock_cb *turn_cb;
+	pj_turn_tp_type conn_type;
+	int conn_transport;
+	pj_stun_auth_cred cred = { 0, };
+	pj_str_t turn_addr;
+	struct ast_sockaddr addr = { { 0, } };
+	pj_stun_config stun_config;
+	struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+	struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
+	pj_turn_session_info info;
+	struct ast_sockaddr local, loop;
+
+	ast_rtp_instance_get_local_address(instance, &local);
+	if (ast_sockaddr_is_ipv4(&local)) {
+		ast_sockaddr_parse(&loop, "127.0.0.1", PARSE_PORT_FORBID);
+	} else {
+		ast_sockaddr_parse(&loop, "::1", PARSE_PORT_FORBID);
+	}
+
+	/* Determine what component we are requesting a TURN session for */
+	if (component == AST_RTP_ICE_COMPONENT_RTP) {
+		turn_sock = &rtp->turn_rtp;
+		turn_cb = &ast_rtp_turn_rtp_sock_cb;
+		conn_transport = TRANSPORT_TURN_RTP;
+		ast_sockaddr_set_port(&loop, ast_sockaddr_port(&local));
+	} else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+		turn_sock = &rtp->turn_rtcp;
+		turn_cb = &ast_rtp_turn_rtcp_sock_cb;
+		conn_transport = TRANSPORT_TURN_RTCP;
+		ast_sockaddr_set_port(&loop, ast_sockaddr_port(&rtp->rtcp->us));
+	} else {
+		return;
+	}
+
+	if (transport == AST_TRANSPORT_UDP) {
+		conn_type = PJ_TURN_TP_UDP;
+	} else if (transport == AST_TRANSPORT_TCP) {
+		conn_type = PJ_TURN_TP_TCP;
+	} else {
+		ast_assert(0);
+		return;
+	}
+
+	ast_sockaddr_parse(&addr, server, PARSE_PORT_FORBID);
+
+	ast_mutex_lock(&rtp->lock);
+	if (*turn_sock) {
+		pj_turn_sock_destroy(*turn_sock);
+		rtp->turn_state = PJ_TURN_STATE_NULL;
+		while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+			ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+		}
+	}
+	ast_mutex_unlock(&rtp->lock);
+
+	if (component == AST_RTP_ICE_COMPONENT_RTP && !rtp->ioqueue) {
+		rtp->ioqueue = rtp_ioqueue_thread_get_or_create();
+		if (!rtp->ioqueue) {
+			return;
+		}
+	}
+
+	pj_stun_config_init(&stun_config, &cachingpool.factory, 0, rtp->ioqueue->ioqueue, rtp->ioqueue->timerheap);
+
+	if (pj_turn_sock_create(&stun_config, ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type,
+		turn_cb, NULL, instance, turn_sock) != PJ_SUCCESS) {
+		ast_log(LOG_WARNING, "Could not create a TURN client socket\n");
+		return;
+	}
+
+	cred.type = PJ_STUN_AUTH_CRED_STATIC;
+	pj_strset2(&cred.data.static_cred.username, (char*)username);
+	cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+	pj_strset2(&cred.data.static_cred.data, (char*)password);
+
+	/* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
+	ast_mutex_lock(&rtp->lock);
+	pj_turn_sock_alloc(*turn_sock, pj_cstr(&turn_addr, server), port, NULL, &cred, NULL);
+	while (rtp->turn_state < PJ_TURN_STATE_READY) {
+		ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+	}
+	ast_mutex_unlock(&rtp->lock);
+
+	/* If a TURN session was allocated add it as a candidate */
+	if (rtp->turn_state != PJ_TURN_STATE_READY) {
+		return;
+	}
+
+	pj_turn_sock_get_info(*turn_sock, &info);
+
+	ast_rtp_ice_add_cand(rtp, component, conn_transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr,
+		&info.relay_addr, NULL, pj_sockaddr_get_len(&info.relay_addr));
+
+	if (component == AST_RTP_ICE_COMPONENT_RTP) {
+		ast_sockaddr_copy(&rtp->rtp_loop, &loop);
+	} else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+		ast_sockaddr_copy(&rtp->rtcp_loop, &loop);
+	}
 }
 
 static char *generate_random_string(char *buf, size_t size)
@@ -1237,6 +1602,22 @@
 {
 	struct ast_rtp_instance *instance = ice->user_data;
 	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+
+	if (status == PJ_SUCCESS) {
+		struct ast_sockaddr remote_address;
+
+		/* Symmetric RTP must be disabled for the remote address to not get overwritten */
+		ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0);
+
+		update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
+		ast_rtp_instance_set_remote_address(instance, &remote_address);
+		turn_enable_bind_channel(rtp, rtp->turn_rtp, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP);
+
+		if (rtp->rtcp) {
+			update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them);
+			turn_enable_bind_channel(rtp, rtp->turn_rtcp, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP);
+		}
+	}
  
 #ifdef HAVE_OPENSSL_SRTP
 	dtls_perform_handshake(instance, &rtp->dtls, 0);
@@ -1261,7 +1642,13 @@
 
 	/* Instead of handling the packet here (which really doesn't work with our architecture) we set a bit to indicate that it should be handled after pj_ice_sess_on_rx_pkt
 	 * returns */
-	rtp->passthrough = 1;
+	if (transport_id == TRANSPORT_SOCKET_RTP || transport_id == TRANSPORT_SOCKET_RTCP) {
+		rtp->passthrough = 1;
+	} else if (transport_id == TRANSPORT_TURN_RTP) {
+		rtp->rtp_passthrough = 1;
+	} else if (transport_id == TRANSPORT_TURN_RTCP) {
+		rtp->rtcp_passthrough = 1;
+	}
 }
 
 static pj_status_t ast_rtp_on_ice_tx_pkt(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, const void *pkt, pj_size_t size, const pj_sockaddr_t *dst_addr, unsigned dst_addr_len)
@@ -1307,106 +1694,11 @@
 	.on_tx_pkt = ast_rtp_on_ice_tx_pkt,
 };
 
-static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
-	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-	struct ast_sockaddr dest = { { 0, }, };
-
-	ast_rtp_instance_get_local_address(instance, &dest);
-
-	ast_sendto(rtp->s, pkt, pkt_len, 0, &dest);
-}
-
-static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
-	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-	struct ast_rtp *rtp = NULL;
-
-	/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
-	if (!instance) {
-		return;
-	}
-
-	rtp = ast_rtp_instance_get_data(instance);
-
-	/* If the TURN session is being destroyed we need to remove it from the RTP instance */
-	if (new_state == PJ_TURN_STATE_DESTROYING) {
-		rtp->turn_rtp = NULL;
-		return;
-	}
-
-	/* We store the new state so the other thread can actually handle it */
-	ast_mutex_lock(&rtp->lock);
-	rtp->turn_state = new_state;
-
-	/* If this is a state that the main thread should be notified about do so */
-	if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
-		ast_cond_signal(&rtp->cond);
-	}
-
-	ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
-	.on_rx_data = ast_rtp_on_turn_rx_rtp_data,
-	.on_state = ast_rtp_on_turn_rtp_state,
-};
-
-static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
-	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-
-	ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp->us);
-}
-
-static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
-	struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-	struct ast_rtp *rtp = NULL;
-
-	/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
-	if (!instance) {
-		return;
-	}
-
-	rtp = ast_rtp_instance_get_data(instance);
-
-	/* If the TURN session is being destroyed we need to remove it from the RTP instance */
-	if (new_state == PJ_TURN_STATE_DESTROYING) {
-		rtp->turn_rtcp = NULL;
-		return;
-	}
-
-	/* We store the new state so the other thread can actually handle it */
-	ast_mutex_lock(&rtp->lock);
-	rtp->turn_state = new_state;
-
-	/* If this is a state that the main thread should be notified about do so */
-	if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
-		ast_cond_signal(&rtp->cond);
-	}
-
-       ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTCP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
-	.on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
-	.on_state = ast_rtp_on_turn_rtcp_state,
-};
-
-/*! \brief Worker thread for I/O queue and timerheap */
-static int ice_worker_thread(void *data)
-{
-	while (!worker_terminate) {
-		const pj_time_val delay = {0, 10};
-
-		pj_ioqueue_poll(ioqueue, &delay);
-
-		pj_timer_heap_poll(timerheap, NULL);
+/*! \brief Worker thread for timerheap */
+static int timer_worker_thread(void *data)
+{
+	while (!timer_terminate) {
+		pj_timer_heap_poll(timer_heap, NULL);
 	}
 
 	return 0;
@@ -1695,6 +1987,9 @@
 	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
 	struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance);
 	char *in = buf;
+#ifdef HAVE_PJPROJECT
+	struct ast_sockaddr *loop = rtcp ? &rtp->rtcp_loop : &rtp->rtp_loop;
+#endif
 
 	if ((len = ast_recvfrom(rtcp ? rtp->rtcp->s : rtp->s, buf, size, flags, sa)) < 0) {
 	   return len;
@@ -1750,7 +2045,16 @@
 #endif
 
 #ifdef HAVE_PJPROJECT
-	if (rtp->ice) {
+	if (!ast_sockaddr_isnull(loop) && !ast_sockaddr_cmp(loop, sa)) {
+		/* ICE traffic will have been handled in the TURN callback, so skip it but update the address
+		 * so it reflects the actual source and not the loopback
+		 */
+		if (rtcp) {
+			ast_sockaddr_copy(sa, &rtp->rtcp->them);
+		} else {
+			ast_rtp_instance_get_remote_address(instance, sa);
+		}
+	} else if (rtp->ice) {
 		pj_str_t combined = pj_str(ast_sockaddr_stringify(sa));
 		pj_sockaddr address;
 		pj_status_t status;
@@ -1767,7 +2071,7 @@
 
 			pj_strerror(status, buf, sizeof(buf));
 			ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
-				(int) status, buf);
+				(int)status, buf);
 			return -1;
 		}
 		if (!rtp->passthrough) {
@@ -1936,7 +2240,7 @@
 
 #ifdef HAVE_PJPROJECT
 static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct ast_rtp *rtp, struct ast_sockaddr *addr, int port, int component,
-				      int transport, const pj_turn_sock_cb *turn_cb, pj_turn_sock **turn_sock)
+				      int transport)
 {
 	pj_sockaddr address[16];
 	unsigned int count = PJ_ARRAY_SIZE(address), pos = 0;
@@ -1975,38 +2279,9 @@
 	}
 
 	/* If configured to use a TURN relay create a session and allocate */
-	if (pj_strlen(&turnaddr) && pj_turn_sock_create(&rtp->ice->stun_cfg, ast_sockaddr_is_ipv4(addr) ? pj_AF_INET() : pj_AF_INET6(), PJ_TURN_TP_TCP,
-							turn_cb, NULL, instance, turn_sock) == PJ_SUCCESS) {
-		pj_stun_auth_cred cred = { 0, };
-		struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_ALLOCATION_WAIT_TIME, 1000));
-		struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
-
-		cred.type = PJ_STUN_AUTH_CRED_STATIC;
-		cred.data.static_cred.username = turnusername;
-		cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
-		cred.data.static_cred.data = turnpassword;
-
-		/* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
-		ast_mutex_lock(&rtp->lock);
-		pj_turn_sock_alloc(*turn_sock, &turnaddr, turnport, NULL, &cred, NULL);
-		ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
-		ast_mutex_unlock(&rtp->lock);
-
-		/* If a TURN session was allocated add it as a candidate */
-		if (rtp->turn_state == PJ_TURN_STATE_READY) {
-			pj_turn_session_info info;
-
-			pj_turn_sock_get_info(*turn_sock, &info);
-
-			if (transport == TRANSPORT_SOCKET_RTP) {
-				transport = TRANSPORT_TURN_RTP;
-			} else if (transport == TRANSPORT_SOCKET_RTCP) {
-				transport = TRANSPORT_TURN_RTCP;
-			}
-
-			ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, &info.relay_addr,
-					     NULL, pj_sockaddr_get_len(&info.relay_addr));
-		}
+	if (pj_strlen(&turnaddr)) {
+		ast_rtp_ice_turn_request(instance, component, AST_TRANSPORT_TCP, pj_strbuf(&turnaddr), turnport,
+			pj_strbuf(&turnusername), pj_strbuf(&turnpassword));
 	}
 }
 #endif
@@ -2065,7 +2340,7 @@
 
 	pj_thread_register_check();
 
-	pj_stun_config_init(&stun_config, &cachingpool.factory, 0, ioqueue, timerheap);
+	pj_stun_config_init(&stun_config, &cachingpool.factory, 0, NULL, timer_heap);
 
 	ufrag = pj_str(rtp->local_ufrag);
 	passwd = pj_str(rtp->local_passwd);
@@ -2078,14 +2353,14 @@
 
 		/* Add all of the available candidates to the ICE session */
 		rtp_add_candidates_to_ice(instance, rtp, addr, port, AST_RTP_ICE_COMPONENT_RTP,
-			TRANSPORT_SOCKET_RTP, &ast_rtp_turn_rtp_sock_cb, &rtp->turn_rtp);
+			TRANSPORT_SOCKET_RTP);
 
 		/* Only add the RTCP candidates to ICE when replacing the session. New sessions
 		 * handle this in a separate part of the setup phase */
 		if (replace && rtp->rtcp) {
 			rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us,
 				ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP,
-				TRANSPORT_SOCKET_RTCP, &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+				TRANSPORT_SOCKET_RTCP);
 		}
 
 		return 0;
@@ -2190,6 +2465,8 @@
 static int ast_rtp_destroy(struct ast_rtp_instance *instance)
 {
 	struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+	struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+	struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
 
 	/* Destroy the smoother that was smoothing out audio if present */
 	if (rtp->smoother) {
@@ -2226,21 +2503,33 @@
 #ifdef HAVE_PJPROJECT
 	pj_thread_register_check();
 
+	/* Destroy the RTP TURN relay if being used */
+	ast_mutex_lock(&rtp->lock);
+	if (rtp->turn_rtp) {
+		pj_turn_sock_destroy(rtp->turn_rtp);
+		rtp->turn_state = PJ_TURN_STATE_NULL;
+		while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+			ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+		}
+	}
+
+	/* Destroy the RTCP TURN relay if being used */
+	if (rtp->turn_rtcp) {
+		pj_turn_sock_destroy(rtp->turn_rtcp);
+		rtp->turn_state = PJ_TURN_STATE_NULL;
+		while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+			ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+		}
+	}
+	ast_mutex_unlock(&rtp->lock);
+
+	if (rtp->ioqueue) {
+		rtp_ioqueue_thread_remove(rtp->ioqueue);
+	}
+
 	/* Destroy the ICE session if being used */
 	if (rtp->ice) {
 		pj_ice_sess_destroy(rtp->ice);
-	}
-
-	/* Destroy the RTP TURN relay if being used */
-	if (rtp->turn_rtp) {
-		pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
-		pj_turn_sock_destroy(rtp->turn_rtp);
-	}
-
-	/* Destroy the RTCP TURN relay if being used */
-	if (rtp->turn_rtcp) {
-		pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
-		pj_turn_sock_destroy(rtp->turn_rtcp);
 	}
 
 	/* Destroy any candidates */
@@ -2343,7 +2632,6 @@
 				ast_sockaddr_stringify(&remote_address),
 				strerror(errno));
 		}
-		update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
 		if (rtp_debug_test_addr(&remote_address)) {
 			ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
 				    ast_sockaddr_stringify(&remote_address),
@@ -2392,8 +2680,6 @@
 			ast_sockaddr_stringify(&remote_address),
 			strerror(errno));
 	}
-
-	update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
 
 	if (rtp_debug_test_addr(&remote_address)) {
 		ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
@@ -2468,8 +2754,6 @@
 				ast_sockaddr_stringify(&remote_address),
 				strerror(errno));
 		}
-
-		update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
 
 		if (rtp_debug_test_addr(&remote_address)) {
 			ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
@@ -2731,8 +3015,6 @@
 		rtp->rtcp->rr_count++;
 	}
 
-	update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &remote_address);
-
 	if (rtcp_debug_test_addr(&rtp->rtcp->them)) {
 		ast_verbose("* Sent RTCP %s to %s%s\n", sr ? "SR" : "RR",
 				ast_sockaddr_stringify(&remote_address), ice ? " (via ICE)" : "");
@@ -2925,8 +3207,6 @@
 				}
 			}
 		}
-
-		update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
 
 		if (rtp_debug_test_addr(&remote_address)) {
 			ast_verbose("Sent RTP packet to      %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
@@ -3900,8 +4180,6 @@
 		return 0;
 	}
 
-	update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
 	if (rtp_debug_test_addr(&remote_address)) {
 		ast_verbose("Sent RTP P2P packet to %s%s (type %-2.2d, len %-6.6d)\n",
 			    ast_sockaddr_stringify(&remote_address),
@@ -4359,8 +4637,7 @@
 
 #ifdef HAVE_PJPROJECT
 			if (rtp->ice) {
-				rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP,
-							  &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+				rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP);
 			}
 #endif
 
@@ -4652,8 +4929,6 @@
 		return res;
 	}
 
-	update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
 	if (rtp_debug_test_addr(&remote_address)) {
 		ast_verbose("Sent Comfort Noise RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
 			    ast_sockaddr_stringify(&remote_address),
@@ -4920,17 +5195,17 @@
 			if (ast_parse_arg(s, PARSE_INADDR, &addr)) {
 				ast_log(LOG_WARNING, "Invalid TURN server address: %s\n", s);
 			} else {
-				pj_strdup2(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
+				pj_strdup2_with_null(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
 				/* ntohs() is not a bug here. The port number is used in host byte order with
 				 * a pjnat API. */
 				turnport = ntohs(addr.sin_port);
 			}
 		}
 		if ((s = ast_variable_retrieve(cfg, "general", "turnusername"))) {
-			pj_strdup2(pool, &turnusername, s);
+			pj_strdup2_with_null(pool, &turnusername, s);
 		}
 		if ((s = ast_variable_retrieve(cfg, "general", "turnpassword"))) {
-			pj_strdup2(pool, &turnpassword, s);
+			pj_strdup2_with_null(pool, &turnpassword, s);
 		}
 #endif
 		ast_config_destroy(cfg);
@@ -4950,6 +5225,20 @@
 	return 0;
 }
 
+#ifdef HAVE_PJPROJECT
+static void rtp_terminate_pjproject(void)
+{
+	if (timer_thread) {
+		timer_terminate = 1;
+		pj_thread_join(timer_thread);
+		pj_thread_destroy(timer_thread);
+	}
+
+	pj_caching_pool_destroy(&cachingpool);
+	pj_shutdown();
+}
+#endif
+
 static int load_module(void)
 {
 #ifdef HAVE_PJPROJECT
@@ -4960,65 +5249,49 @@
 	}
 
 	if (pjlib_util_init() != PJ_SUCCESS) {
-		pj_shutdown();
+		rtp_terminate_pjproject();
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
 	if (pjnath_init() != PJ_SUCCESS) {
-		pj_shutdown();
+		rtp_terminate_pjproject();
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
 	pj_caching_pool_init(&cachingpool, &pj_pool_factory_default_policy, 0);
 
-	pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
-
-	if (pj_timer_heap_create(pool, 100, &timerheap) != PJ_SUCCESS) {
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
+	pool = pj_pool_create(&cachingpool.factory, "timer", 512, 512, NULL);
+
+	if (pj_timer_heap_create(pool, 100, &timer_heap) != PJ_SUCCESS) {
+		rtp_terminate_pjproject();
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
 	if (pj_lock_create_recursive_mutex(pool, "rtp%p", &lock) != PJ_SUCCESS) {
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
+		rtp_terminate_pjproject();
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
-	pj_timer_heap_set_lock(timerheap, lock, PJ_TRUE);
-
-	if (pj_ioqueue_create(pool, 16, &ioqueue) != PJ_SUCCESS) {
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
+	pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE);
+
+	if (pj_thread_create(pool, "timer", &timer_worker_thread, NULL, 0, 0, &timer_thread) != PJ_SUCCESS) {
+		rtp_terminate_pjproject();
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
-	if (pj_thread_create(pool, "ice", &ice_worker_thread, NULL, 0, 0, &thread) != PJ_SUCCESS) {
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
-		return AST_MODULE_LOAD_DECLINE;
-	}
 #endif
 
 	if (ast_rtp_engine_register(&asterisk_rtp_engine)) {
 #ifdef HAVE_PJPROJECT
-		worker_terminate = 1;
-		pj_thread_join(thread);
-		pj_thread_destroy(thread);
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
+		rtp_terminate_pjproject();
 #endif
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
 	if (ast_cli_register_multiple(cli_rtp, ARRAY_LEN(cli_rtp))) {
 #ifdef HAVE_PJPROJECT
-		worker_terminate = 1;
-		pj_thread_join(thread);
-		pj_thread_destroy(thread);
 		ast_rtp_engine_unregister(&asterisk_rtp_engine);
-		pj_caching_pool_destroy(&cachingpool);
-		pj_shutdown();
+		rtp_terminate_pjproject();
 #endif
 		return AST_MODULE_LOAD_DECLINE;
 	}
@@ -5034,15 +5307,8 @@
 	ast_cli_unregister_multiple(cli_rtp, ARRAY_LEN(cli_rtp));
 
 #ifdef HAVE_PJPROJECT
-	worker_terminate = 1;
-
 	pj_thread_register_check();
-
-	pj_thread_join(thread);
-	pj_thread_destroy(thread);
-
-	pj_caching_pool_destroy(&cachingpool);
-	pj_shutdown();
+	rtp_terminate_pjproject();
 #endif
 
 	return 0;




More information about the asterisk-commits mailing list