[asterisk-commits] qwell: branch qwell/fun_with_transports r384030 - /team/qwell/fun_with_transp...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 27 10:44:10 CDT 2013


Author: qwell
Date: Wed Mar 27 10:44:06 2013
New Revision: 384030

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384030
Log:
Fix up (read: add) threading.  It might actually be doing the right thing now.

Fix some module dependencies in passing, that were causing build warnings.

Modified:
    team/qwell/fun_with_transports/res/res_sip_nat.c
    team/qwell/fun_with_transports/res/res_sip_outbound_registration.c
    team/qwell/fun_with_transports/res/res_sip_registrar.c
    team/qwell/fun_with_transports/res/res_sip_transport_websocket.c

Modified: team/qwell/fun_with_transports/res/res_sip_nat.c
URL: http://svnview.digium.com/svn/asterisk/team/qwell/fun_with_transports/res/res_sip_nat.c?view=diff&rev=384030&r1=384029&r2=384030
==============================================================================
--- team/qwell/fun_with_transports/res/res_sip_nat.c (original)
+++ team/qwell/fun_with_transports/res/res_sip_nat.c Wed Mar 27 10:44:06 2013
@@ -17,6 +17,7 @@
  */
 
 /*** MODULEINFO
+	<depend>pjproject</depend>
 	<support_level>core</support_level>
  ***/
 

Modified: team/qwell/fun_with_transports/res/res_sip_outbound_registration.c
URL: http://svnview.digium.com/svn/asterisk/team/qwell/fun_with_transports/res/res_sip_outbound_registration.c?view=diff&rev=384030&r1=384029&r2=384030
==============================================================================
--- team/qwell/fun_with_transports/res/res_sip_outbound_registration.c (original)
+++ team/qwell/fun_with_transports/res/res_sip_outbound_registration.c Wed Mar 27 10:44:06 2013
@@ -17,6 +17,7 @@
  */
 
 /*** MODULEINFO
+	<depend>pjproject</depend>
 	<support_level>core</support_level>
  ***/
 

Modified: team/qwell/fun_with_transports/res/res_sip_registrar.c
URL: http://svnview.digium.com/svn/asterisk/team/qwell/fun_with_transports/res/res_sip_registrar.c?view=diff&rev=384030&r1=384029&r2=384030
==============================================================================
--- team/qwell/fun_with_transports/res/res_sip_registrar.c (original)
+++ team/qwell/fun_with_transports/res/res_sip_registrar.c Wed Mar 27 10:44:06 2013
@@ -17,6 +17,7 @@
  */
 
 /*** MODULEINFO
+	<depend>pjproject</depend>
 	<support_level>core</support_level>
  ***/
 

Modified: team/qwell/fun_with_transports/res/res_sip_transport_websocket.c
URL: http://svnview.digium.com/svn/asterisk/team/qwell/fun_with_transports/res/res_sip_transport_websocket.c?view=diff&rev=384030&r1=384029&r2=384030
==============================================================================
--- team/qwell/fun_with_transports/res/res_sip_transport_websocket.c (original)
+++ team/qwell/fun_with_transports/res/res_sip_transport_websocket.c Wed Mar 27 10:44:06 2013
@@ -17,6 +17,7 @@
  */
 
 /*** MODULEINFO
+	<depend>pjproject</depend>
 	<use type="module">res_http_websocket</use>
 	<support_level>core</support_level>
  ***/
@@ -28,6 +29,7 @@
 #include "asterisk/http_websocket.h"
 #include "asterisk/res_sip.h"
 #include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
 
 static int transport_type_ws;
 static int transport_type_wss;
@@ -35,6 +37,17 @@
 struct ws_transport {
 	pjsip_transport transport;
 	struct ast_websocket *ws_session;
+};
+
+struct ws_create_data {
+	struct ws_transport *transport;
+	struct ast_websocket *ws_session;
+};
+
+struct ws_read_data {
+	struct ws_transport *transport;
+	char *payload;
+	uint64_t payload_len;
 };
 
 static pj_status_t ws_send_msg(pjsip_transport *transport,
@@ -76,21 +89,37 @@
 	return PJ_SUCCESS;
 }
 
-static int ws_recv(void *data)
-{
-	struct ast_websocket *session = (struct ast_websocket *)data;
+static int ws_shutdown_transport(void *data)
+{
+	pjsip_transport *transport = data;
+	pjsip_transport_shutdown(transport);
+	return 0;
+}
+
+static int ws_create_transport(void *data)
+{
+	struct ws_create_data *create_data = data;
+	struct ws_transport *newtransport;
 
 	pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
 	struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
 
 	pj_pool_t *pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512);
-	struct ws_transport *newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport);
+
+	pj_str_t buf;
+
+	newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport);
+
+	newtransport->ws_session = create_data->ws_session;
 
 	pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
 	pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
 
 	newtransport->transport.pool = pool;
-	newtransport->transport.key.type = ast_websocket_is_secure(session) ? transport_type_wss : transport_type_ws;
+	newtransport->transport.addr_len = sizeof(pj_sockaddr_in);
+	pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr);
+	newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
+	newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws;
 	newtransport->transport.type_name = (char*)pjsip_transport_get_type_name(newtransport->transport.key.type);
 	newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
 	newtransport->transport.info = (char*) pj_pool_alloc(newtransport->transport.pool, 64);
@@ -100,71 +129,92 @@
 	newtransport->transport.send_msg = &ws_send_msg;
 	newtransport->transport.do_shutdown = &ws_shutdown;
 	newtransport->transport.destroy = &ws_destroy_transport;
-	newtransport->ws_session = session;
-
-	pj_atomic_inc(newtransport->transport.ref_cnt);
 
 	pjsip_transport_register(newtransport->transport.tpmgr, (pjsip_transport *)newtransport);
+
+	create_data->transport = newtransport;
+	return 0;
+}
+
+static int ws_read(void *data)
+{
+	struct ws_read_data *read_data = data;
+	struct ws_transport *newtransport = read_data->transport;
+	struct ast_websocket *session = newtransport->ws_session;
+
+	pjsip_rx_data *rdata;
+	int recvd;
+	pj_str_t buf;
+
+	rdata = PJ_POOL_ZALLOC_T(newtransport->transport.pool, pjsip_rx_data);
+
+	rdata->tp_info.pool = newtransport->transport.pool;
+	rdata->tp_info.transport = &newtransport->transport;
+
+	pj_gettimeofday(&rdata->pkt_info.timestamp);
+
+	pj_memcpy(rdata->pkt_info.packet, read_data->payload, read_data->payload_len);
+	rdata->pkt_info.len = read_data->payload_len;
+	rdata->pkt_info.zero = 0;
+
+	pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
+	rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
+
+	rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
+
+	pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
+	rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
+
+	recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
+
+	return (read_data->payload_len == recvd) ? 0 : -1;
+}
+
+static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
+{
+	struct ast_taskprocessor *serializer = NULL;
+	struct ws_transport *transport = NULL;
 
 	if (ast_websocket_set_nonblock(session)) {
 		ast_websocket_unref(session);
-		return 0;
-	}
-
-	while ((ast_wait_for_input(ast_websocket_fd(session), -1)) > 0) {
-		char *payload;
-		uint64_t payload_len;
+		return;
+	}
+
+	serializer = ast_sip_create_serializer();
+
+	while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
+		struct ws_read_data read_data;
 		enum ast_websocket_opcode opcode;
 		int fragmented;
 
-		if (ast_websocket_read(session, &payload, &payload_len, &opcode, &fragmented)) {
+		if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
 			break;
 		}
 
 		if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
-			pjsip_rx_data *rdata;
-			int recvd;
-			pj_str_t buf;
-
-			rdata = PJ_POOL_ZALLOC_T(newtransport->transport.pool, pjsip_rx_data);
-
-			rdata->tp_info.pool = newtransport->transport.pool;
-			rdata->tp_info.transport = &newtransport->transport;
-
-			pj_gettimeofday(&rdata->pkt_info.timestamp);
-
-			pj_memcpy(rdata->pkt_info.packet, payload, payload_len);
-			rdata->pkt_info.len = payload_len;
-			rdata->pkt_info.zero = 0;
-
-		        pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
-			rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
-
-			rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
-
-			pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
-			rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
-
-//			rdata->pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
-
-			recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
+			if (!transport) {
+				struct ws_create_data create_data;
+				create_data.ws_session = session;
+
+				ast_sip_push_task_synchronous(serializer, ws_create_transport, &create_data);
+
+				transport = create_data.transport;
+			}
+
+			read_data.transport = transport;
+
+			ast_sip_push_task(serializer, ws_read, &read_data);
 		} else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
-			return -1;
+			break;
 		}
 	}
 
-	pj_atomic_dec(newtransport->transport.ref_cnt);
-
-	pjsip_transport_shutdown(&newtransport->transport);
-
+	if (transport) {
+		ast_sip_push_task_synchronous(serializer, ws_shutdown_transport, transport);
+	}
+
+	ast_taskprocessor_unreference(serializer);
 	ast_websocket_unref(session);
-
-	return 0;
-}
-
-static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
-{
-	ast_sip_push_task(NULL, ws_recv, session);
 }
 
 static int load_module(void)




More information about the asterisk-commits mailing list