[asterisk-commits] file: branch file/serialize_the_WORLD r382222 - in /team/file/serialize_the_W...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Feb 27 17:19:42 CST 2013


Author: file
Date: Wed Feb 27 17:19:39 2013
New Revision: 382222

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382222
Log:
Move serialization to the distributor.

This adds support for specifying a serializer on a per-dialog basis. When a request
or response comes in the dialog is found and the serializer used. This also means that
all modules involved in handling a message are invoked within the serializer, and not
partially inside and outside. As a result in most cases you no longer have to push your
own tasks.

Modified:
    team/file/serialize_the_WORLD/include/asterisk/res_sip.h
    team/file/serialize_the_WORLD/res/res_sip.exports.in
    team/file/serialize_the_WORLD/res/res_sip/sip_distributor.c
    team/file/serialize_the_WORLD/res/res_sip_session.c

Modified: team/file/serialize_the_WORLD/include/asterisk/res_sip.h
URL: http://svnview.digium.com/svn/asterisk/team/file/serialize_the_WORLD/include/asterisk/res_sip.h?view=diff&rev=382222&r1=382221&r2=382222
==============================================================================
--- team/file/serialize_the_WORLD/include/asterisk/res_sip.h (original)
+++ team/file/serialize_the_WORLD/include/asterisk/res_sip.h Wed Feb 27 17:19:39 2013
@@ -541,7 +541,15 @@
  * \retval non-NULL Newly-created serializer
  */
 struct ast_taskprocessor *ast_sip_create_serializer(void);
- 
+
+/*!
+ * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
+ *
+ * \param dlg The SIP dialog itself
+ * \param serializer The serializer to use
+ */
+void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer);
+
 /*!
  * \brief Pushes a task to SIP servants
  *

Modified: team/file/serialize_the_WORLD/res/res_sip.exports.in
URL: http://svnview.digium.com/svn/asterisk/team/file/serialize_the_WORLD/res/res_sip.exports.in?view=diff&rev=382222&r1=382221&r2=382222
==============================================================================
--- team/file/serialize_the_WORLD/res/res_sip.exports.in (original)
+++ team/file/serialize_the_WORLD/res/res_sip.exports.in Wed Feb 27 17:19:39 2013
@@ -28,6 +28,7 @@
 		LINKER_SYMBOL_PREFIXast_sip_endpoint_get_location;
 		LINKER_SYMBOL_PREFIXast_pjsip_rdata_get_endpoint;
 		LINKER_SYMBOL_PREFIXast_sip_thread_is_servant;
+		LINKER_SYMBOL_PREFIXast_sip_dialog_set_serializer;
 		LINKER_SYMBOL_PREFIXpj_*;
 		LINKER_SYMBOL_PREFIXpjsip_*;
 		LINKER_SYMBOL_PREFIXpjmedia_*;

Modified: team/file/serialize_the_WORLD/res/res_sip/sip_distributor.c
URL: http://svnview.digium.com/svn/asterisk/team/file/serialize_the_WORLD/res/res_sip/sip_distributor.c?view=diff&rev=382222&r1=382221&r2=382222
==============================================================================
--- team/file/serialize_the_WORLD/res/res_sip/sip_distributor.c (original)
+++ team/file/serialize_the_WORLD/res/res_sip/sip_distributor.c Wed Feb 27 17:19:39 2013
@@ -24,14 +24,7 @@
 #include "asterisk/res_sip.h"
 
 static int distribute(void *data);
-
-static pj_bool_t distributor(pjsip_rx_data *rdata)
-{
-	pjsip_rx_data *clone;
-	pjsip_rx_data_clone(rdata, 0, &clone);
-	ast_sip_push_task(NULL, distribute, clone);
-	return PJ_TRUE;
-}
+static pj_bool_t distributor(pjsip_rx_data *rdata);
 
 static pjsip_module distributor_mod = {
 	.name = {"Request Distributor", 19},
@@ -39,6 +32,33 @@
 	.on_rx_request = distributor,
 	.on_rx_response = distributor,
 };
+
+void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
+{
+	pjsip_dlg_inc_lock(dlg);
+	pjsip_dlg_set_mod_data(dlg, distributor_mod.id, serializer);
+	pjsip_dlg_dec_lock(dlg);
+}
+
+static pj_bool_t distributor(pjsip_rx_data *rdata)
+{
+	pjsip_dialog *dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, &rdata->msg_info.to->tag, &rdata->msg_info.from->tag, PJ_TRUE);
+	struct ast_taskprocessor *serializer = NULL;
+	pjsip_rx_data *clone;
+
+	if (dlg) {
+		serializer = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
+	}
+
+	pjsip_rx_data_clone(rdata, 0, &clone);
+	ast_sip_push_task(serializer, distribute, clone);
+
+	if (dlg) {
+		pjsip_dlg_dec_lock(dlg);
+	}
+
+	return PJ_TRUE;
+}
 
 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
 

Modified: team/file/serialize_the_WORLD/res/res_sip_session.c
URL: http://svnview.digium.com/svn/asterisk/team/file/serialize_the_WORLD/res/res_sip_session.c?view=diff&rev=382222&r1=382221&r2=382222
==============================================================================
--- team/file/serialize_the_WORLD/res/res_sip_session.c (original)
+++ team/file/serialize_the_WORLD/res/res_sip_session.c Wed Feb 27 17:19:39 2013
@@ -46,7 +46,7 @@
 /* Some forward declarations */
 static void handle_incoming_request(struct ast_sip_session *session, pjsip_rx_data *rdata);
 static void handle_incoming_response(struct ast_sip_session *session, pjsip_rx_data *rdata);
-static int handle_incoming(void *data);
+static int handle_incoming(struct ast_sip_session *session, pjsip_rx_data *rdata);
 static void handle_outgoing_request(struct ast_sip_session *session, pjsip_tx_data *tdata);
 static void handle_outgoing_response(struct ast_sip_session *session, pjsip_tx_data *tdata);
 static void handle_outgoing(struct ast_sip_session *session, pjsip_tx_data *tdata);
@@ -502,6 +502,7 @@
 	if (!session->serializer) {
 		return NULL;
 	}
+	ast_sip_dialog_set_serializer(inv_session->dlg, session->serializer);
 	ao2_ref(endpoint, +1);
 	session->endpoint = endpoint;
 	session->inv_session = inv_session;
@@ -909,51 +910,17 @@
 	}
 }
 
-struct handle_incoming_data {
-	struct ast_sip_session *session;
-	pjsip_rx_data *rdata;
-};
-
-static void handle_incoming_data_destroy(void *obj)
-{
-	struct handle_incoming_data *hid = obj;
-	if (hid->rdata) {
-		pjsip_rx_data_free_cloned(hid->rdata);
-	}
-	if (hid->session) {
-		ao2_ref(hid->session, -1);
-	}
-}
-
-static struct handle_incoming_data *handle_incoming_data_alloc(struct ast_sip_session *session, pjsip_rx_data *rdata)
-{
-	struct handle_incoming_data *hid = ao2_alloc(sizeof(*hid), handle_incoming_data_destroy);
-	if (!hid) {
-		return NULL;
-	}
-	pjsip_rx_data_clone(rdata, 0, &hid->rdata);
-	if (!hid->rdata) {
-		ao2_ref(hid, -1);
-		return NULL;
-	}
-	ao2_ref(session, +1);
-	hid->session = session;
-	return hid;
-}
-
-static int handle_incoming(void *data)
-{
-	struct handle_incoming_data *hid = data;
-	struct ast_sip_session *session = hid->session;
-	pjsip_rx_data *rdata = hid->rdata;
+static int handle_incoming(struct ast_sip_session *session, pjsip_rx_data *rdata)
+{
 	ast_debug(3, "Received %s\n", rdata->msg_info.msg->type == PJSIP_REQUEST_MSG ?
 			"request" : "response");
+
 	if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
 		handle_incoming_request(session, rdata);
 	} else {
 		handle_incoming_response(session, rdata);
 	}
-	ao2_ref(hid, -1);
+
 	return 0;
 }
 
@@ -1017,6 +984,7 @@
 	}
 
 	session->inv_session->mod_data[session_module.id] = NULL;
+	ast_sip_dialog_set_serializer(session->inv_session->dlg, NULL);
 	ao2_cleanup(session);
 	return 0;
 }
@@ -1024,7 +992,7 @@
 static void session_inv_on_state_changed(pjsip_inv_session *inv, pjsip_event *e)
 {
 	struct ast_sip_session *session = inv->mod_data[session_module.id];
-	struct handle_incoming_data *hid;
+
 	ast_debug(3, "on_state_changed callback called. Event dump to follow\n");
 	ast_debug(3, "inv state is %s, event type is %s\n", pjsip_inv_state_name(inv->state), pjsip_event_str(e->type));
 	switch(e->type) {
@@ -1032,13 +1000,7 @@
 		handle_outgoing(session, e->body.tx_msg.tdata);
 		break;
 	case PJSIP_EVENT_RX_MSG:
-		hid = handle_incoming_data_alloc(session, e->body.rx_msg.rdata);
-		if (hid) {
-			if (ast_sip_push_task(session->serializer, handle_incoming, hid)) {
-				ast_log(LOG_WARNING, "Failed to pass in-dialog request to threadpool\n");
-				ao2_cleanup(hid);
-			}
-		}
+		handle_incoming(session, e->body.rx_msg.rdata);
 		break;
 	case PJSIP_EVENT_TSX_STATE:
 		ast_debug(3, "Source of transaction state change is %s\n", pjsip_event_str(e->body.tsx_state.type));
@@ -1048,13 +1010,7 @@
 			handle_outgoing(session, e->body.tsx_state.src.tdata);
 			break;
 		case PJSIP_EVENT_RX_MSG:
-			hid = handle_incoming_data_alloc(session, e->body.tsx_state.src.rdata);
-			if (hid) {
-				if (ast_sip_push_task(session->serializer, handle_incoming, hid)) {
-					ast_log(LOG_WARNING, "Failed to pass in-dialog request to threadpool\n");
-					ao2_cleanup(hid);
-				}
-			}
+			handle_incoming(session, e->body.tsx_state.src.rdata);
 			break;
 		case PJSIP_EVENT_TRANSPORT_ERROR:
 		case PJSIP_EVENT_TIMER:
@@ -1074,10 +1030,7 @@
 	}
 
 	if (inv->state == PJSIP_INV_STATE_DISCONNECTED) {
-		if (ast_sip_push_task(session->serializer, session_end, session)) {
-			ast_log(LOG_WARNING, "Failed to push session end task to threadpool. Ending session in-thread\n");
-			session_end(session);
-		}
+		session_end(session);
 	}
 }
 
@@ -1147,45 +1100,17 @@
 	return local;
 }
 
-struct on_rx_offer_data {
-	pjsip_inv_session *inv;
-	const pjmedia_sdp_session *offer;
+static void session_inv_on_rx_offer(pjsip_inv_session *inv, const pjmedia_sdp_session *offer)
+{
+	struct ast_sip_session *session = inv->mod_data[session_module.id];
 	pjmedia_sdp_session *answer;
-};
-
-static int on_rx_offer(void *data)
-{
-	struct on_rx_offer_data *orod = data;
-	pjsip_inv_session *inv = orod->inv;
-	const pjmedia_sdp_session *offer = orod->offer;
-	struct ast_sip_session *session = inv->mod_data[session_module.id];
 
 	if (handle_incoming_sdp(session, offer)) {
-		return -1;
-	}
-
-	orod->answer = create_local_sdp(inv, session, offer);
-
-	return 0;
-}
-
-static void session_inv_on_rx_offer(pjsip_inv_session *inv, const pjmedia_sdp_session *offer)
-{
-	struct ast_sip_session *session = inv->mod_data[session_module.id];
-	struct on_rx_offer_data orod;
-	orod.inv = inv;
-	orod.offer = offer;
-	orod.answer = NULL;
-
-	/* PJSIP requires us to have set an SDP answer when this returns, so we have to push this
-	 * task synchronously
-	 */
-	if (ast_sip_push_task_synchronous(session->serializer, on_rx_offer, &orod)) {
-		ast_log(LOG_WARNING, "Synchronous threadpool task to create SDP answer failed. Using saved local SDP\n");
-	}
-
-	if (orod.answer) {
-		pjsip_inv_set_sdp_answer(inv, orod.answer);
+		return;
+	}
+
+	if ((answer = create_local_sdp(inv, session, offer))) {
+		pjsip_inv_set_sdp_answer(inv, answer);
 	}
 }
 
@@ -1196,63 +1121,21 @@
 }
 #endif
 
-struct media_update_data {
-	struct ast_sip_session *session;
-	pjsip_inv_session *inv_session;
-	pj_status_t status;
-};
-
-static void media_update_data_destroy(void *obj)
-{
-	struct media_update_data *mud = obj;
-	ao2_cleanup(mud->session);
-}
-
-static struct media_update_data *media_update_data_alloc(struct ast_sip_session *session, pjsip_inv_session *inv_session, pj_status_t status)
-{
-	struct media_update_data *mud = ao2_alloc(sizeof(*mud), media_update_data_destroy);
-	if (!mud) {
-		return NULL;
-	}
-	ao2_ref(session, +1);
-	mud->session = session;
-	mud->inv_session = inv_session;
-	mud->status = status;
-	return mud;
-}
-
-static int on_media_update(void *data)
-{
-	struct media_update_data *mud = data;
-	struct ast_sip_session *session = mud->session;
-	struct pjsip_inv_session *inv = mud->inv_session;
+static void session_inv_on_media_update(pjsip_inv_session *inv, pj_status_t status)
+{
+	struct ast_sip_session *session = inv->mod_data[session_module.id];
 	const pjmedia_sdp_session *local, *remote;
 
-	if ((mud->status != PJ_SUCCESS) || (pjmedia_sdp_neg_get_active_local(inv->neg, &local) != PJ_SUCCESS) ||
+	if ((status != PJ_SUCCESS) || (pjmedia_sdp_neg_get_active_local(inv->neg, &local) != PJ_SUCCESS) ||
 		(pjmedia_sdp_neg_get_active_remote(inv->neg, &remote) != PJ_SUCCESS)) {
 		if (session->channel) {
 			ast_channel_hangupcause_set(session->channel, AST_CAUSE_BEARERCAPABILITY_NOTAVAIL);
 			ast_queue_hangup(session->channel);
 		}
-		return 0;
+		return;
 	}
 
 	handle_negotiated_sdp(session, local, remote);
-	ao2_ref(mud, -1);
-	return 0;
-}
-
-static void session_inv_on_media_update(pjsip_inv_session *inv, pj_status_t status)
-{
-	struct ast_sip_session *session = inv->mod_data[session_module.id];
-	struct media_update_data *mud = media_update_data_alloc(session, inv, status);
-	if (!mud) {
-		return;
-	}
-	if (ast_sip_push_task(session->serializer, on_media_update, mud)) {
-		ast_log(LOG_WARNING, "Failed to pass media update task to the threadpool\n");
-		ao2_cleanup(mud);
-	}
 }
 
 static pjsip_redirect_op session_inv_on_redirected(pjsip_inv_session *inv, const pjsip_uri *target, const pjsip_event *e)




More information about the asterisk-commits mailing list