[Asterisk-code-review] res pjsip/config transport: Allow reloading transports. (asterisk[13])

George Joseph asteriskteam at digium.com
Thu Jan 28 19:56:22 CST 2016


George Joseph has uploaded a new change for review.

  https://gerrit.asterisk.org/2125

Change subject: res_pjsip/config_transport:  Allow reloading transports.
......................................................................

res_pjsip/config_transport:  Allow reloading transports.

The 'reload' mechanism actually involves closing the underlying
socket and calling the appropriate udp, tcp or tls start functions
again.  Only outbound_registration, pubsub and session needed work
to reset the transport before sending requests to insure that the
pjsip transport didn't get pulled out from under them.

In my testing, no calls were dropped when a transport was changed
for any of the 3 transport types even if ip addresses or ports were
changed. To be on the safe side however, a new transport option was
added (allow_reload) which defaults to 'no'.  Unless it's explicitly
set to 'yes' for a transport, changes to that transport will be ignored
on a reload of res_pjsip.  This should preserve the current behavior.

Change-Id: I5e759850e25958117d4c02f62ceb7244d7ec9edf
---
M CHANGES
M configs/samples/pjsip.conf.sample
A contrib/ast-db-manage/config/versions/41bb5f0148d8_add_allow_reload_to_pjsip_transport.py
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip/config_transport.c
M res/res_pjsip_outbound_registration.c
M res/res_pjsip_pubsub.c
M res/res_pjsip_session.c
9 files changed, 153 insertions(+), 22 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/25/2125/1

diff --git a/CHANGES b/CHANGES
index 9259111..82b9f98 100644
--- a/CHANGES
+++ b/CHANGES
@@ -23,6 +23,13 @@
 res_pjsip
 ------------------
 
+ * Transports are now reloadable.  In testing, no in-progress calls were
+   disrupted if the ip address or port weren't changed, but the possibility
+   still exists.  To make sure there are no unintentional drops, a new option
+   'allow_reload', which defaults to 'no' has been added to transport.  If
+   left at the default, changes to the particular transport will be ignored.
+   If set to 'yes', changes (if any) will be applied.
+
  * Added new global option (regcontext) to pjsip. When set, Asterisk will
    dynamically create and destroy a NoOp priority 1 extension
    for a given endpoint who registers or unregisters with us.
diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample
index a91ece9..78fded6 100644
--- a/configs/samples/pjsip.conf.sample
+++ b/configs/samples/pjsip.conf.sample
@@ -810,6 +810,12 @@
                                 ; clients are slow to process the received
                                 ; information. Value is in milliseconds; default
                                 ; is 100 ms.
+;allow_reload=no    ; Although transports can now be reloaded, that may not be
+                    ; desirable because of the slight possibility of dropped
+                    ; calls. To make sure there are no unintentional drops, if
+                    ; this option is set to 'no' (the default) changes to the
+                    ; particular transport will be ignored. If set to 'yes',
+                    ; changes (if any) will be applied.
 
 ;==========================AOR SECTION OPTIONS=========================
 ;[aor]
diff --git a/contrib/ast-db-manage/config/versions/41bb5f0148d8_add_allow_reload_to_pjsip_transport.py b/contrib/ast-db-manage/config/versions/41bb5f0148d8_add_allow_reload_to_pjsip_transport.py
new file mode 100644
index 0000000..6274357
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/41bb5f0148d8_add_allow_reload_to_pjsip_transport.py
@@ -0,0 +1,26 @@
+"""Add allow_reload to pjsip transport
+
+Revision ID: 41bb5f0148d8
+Revises: 423f34ad36e2
+Create Date: 2016-01-28 16:47:29.616499
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '41bb5f0148d8'
+down_revision = '423f34ad36e2'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+YESNO_NAME = 'yesno_values'
+YESNO_VALUES = ['yes', 'no']
+
+def upgrade():
+    op.add_column('ps_transports', sa.Column('allow_reload', yesno_values))
+    pass
+
+def downgrade():
+    op.drop_column('ps_transports', 'allow_reload')
+    pass
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 5dc16bc..09d8667 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -132,6 +132,8 @@
 	unsigned int cos;
 	/*! Write timeout */
 	int write_timeout;
+	/*! Allow reload */
+	int allow_reload;
 };
 
 /*!
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index c94f6a9..13f4cee 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1026,6 +1026,14 @@
 						Value is in milliseconds; default is 100 ms.</para>
 					</description>
 				</configOption>
+				<configOption name="allow_reload" default="no">
+					<synopsis>Allow this transport to be reloaded.</synopsis>
+					<description>
+						<para>Allow this transport to be reloaded when res_pjsip is reloaded.
+						This option defaults to "no" because reloading a transport may disrupt
+						in-progress calls.</para>
+					</description>
+				</configOption>
 			</configObject>
 			<configObject name="contact">
 				<synopsis>A way of creating an aliased name to a SIP URI</synopsis>
diff --git a/res/res_pjsip/config_transport.c b/res/res_pjsip/config_transport.c
index 840824b..b5cd4d5 100644
--- a/res/res_pjsip/config_transport.c
+++ b/res/res_pjsip/config_transport.c
@@ -147,20 +147,32 @@
 	struct ast_sip_transport *transport = obj;
 	RAII_VAR(struct ast_sip_transport *, existing, ast_sorcery_retrieve_by_id(sorcery, "transport", ast_sorcery_object_get_id(obj)), ao2_cleanup);
 	pj_status_t res = -1;
+	struct ast_variable *changes = NULL;
 
-	if (!existing || !existing->state) {
-		if (!(transport->state = ao2_alloc(sizeof(*transport->state), transport_state_destroy))) {
-			ast_log(LOG_ERROR, "Transport state for '%s' could not be allocated\n", ast_sorcery_object_get_id(obj));
-			return -1;
+	if (existing) {
+		if (existing->state) {
+			transport->state = existing->state;
+			ao2_ref(transport->state, +1);
 		}
-	} else {
-		transport->state = existing->state;
-		ao2_ref(transport->state, +1);
+
+		if (!transport->allow_reload) {
+			ast_log(LOG_NOTICE, "Transport '%s'  is not reloadable, maintaining previous values\n", ast_sorcery_object_get_id(obj));
+			return 0;
+		}
+
+		ast_sorcery_diff(sorcery, existing, transport, &changes);
+		if (changes == NULL) {
+			return 0;
+		}
+		ast_variables_destroy(changes);
 	}
 
-	/* Once active a transport can not be reconfigured */
-	if (transport->state->transport || transport->state->factory) {
-		return -1;
+	if (!transport->state) {
+		if (!(transport->state = ao2_alloc(sizeof(*transport->state), transport_state_destroy))) {
+			ast_log(LOG_ERROR, "Transport state for '%s' could not be allocated\n",
+				ast_sorcery_object_get_id(obj));
+			return -1;
+		}
 	}
 
 	if (transport->host.addr.sa_family != PJ_AF_INET && transport->host.addr.sa_family != PJ_AF_INET6) {
@@ -192,12 +204,21 @@
 	}
 
 	if (transport->type == AST_TRANSPORT_UDP) {
+
+		if (existing && transport->state->transport) {
+				pjsip_udp_transport_pause(transport->state->transport, PJSIP_UDP_TRANSPORT_DESTROY_SOCKET);
+				/*
+				 * We need to sleep for a bit to allow the OS to release the socket or we'll get an
+				 * Address in Use error when we try to re-open.
+				 */
+				usleep(100000);
+		}
+
 		if (transport->host.addr.sa_family == pj_AF_INET()) {
-			res = pjsip_udp_transport_start(ast_sip_get_pjsip_endpoint(), &transport->host.ipv4, NULL, transport->async_operations, &transport->state->transport);
+			res = pjsip_udp_transport_start(ast_sip_get_pjsip_endpoint(), &transport->host.ipv4, NULL, transport->async_operations, 	&transport->state->transport);
 		} else if (transport->host.addr.sa_family == pj_AF_INET6()) {
 			res = pjsip_udp_transport_start6(ast_sip_get_pjsip_endpoint(), &transport->host.ipv6, NULL, transport->async_operations, &transport->state->transport);
 		}
-
 		if (res == PJ_SUCCESS && (transport->tos || transport->cos)) {
 			pj_sock_t sock;
 			pj_qos_params qos_params;
@@ -209,6 +230,11 @@
 		}
 	} else if (transport->type == AST_TRANSPORT_TCP) {
 		pjsip_tcp_transport_cfg cfg;
+
+		if (transport->state->factory && transport->state->factory->destroy) {
+			transport->state->factory->destroy(transport->state->factory);
+			usleep(100000);
+		}
 
 		pjsip_tcp_transport_cfg_default(&cfg, transport->host.addr.sa_family);
 		cfg.bind_addr = transport->host;
@@ -263,6 +289,11 @@
 		transport->tls.privkey_file = pj_str((char*)transport->privkey_file);
 		transport->tls.password = pj_str((char*)transport->password);
 		set_qos(transport, &transport->tls.qos_params);
+
+		if (transport->state->factory && transport->state->factory->destroy) {
+			transport->state->factory->destroy(transport->state->factory);
+			usleep(100000);
+		}
 
 		res = pjsip_tls_transport_start2(ast_sip_get_pjsip_endpoint(), &transport->tls, &transport->host, NULL, transport->async_operations, &transport->state->factory);
 	} else if ((transport->type == AST_TRANSPORT_WS) || (transport->type == AST_TRANSPORT_WSS)) {
@@ -775,9 +806,10 @@
 {
 	struct ast_sorcery *sorcery = ast_sip_get_sorcery();
 
+
 	ast_sorcery_apply_default(sorcery, "transport", "config", "pjsip.conf,criteria=type=transport");
 
-	if (ast_sorcery_object_register_no_reload(sorcery, "transport", transport_alloc, NULL, transport_apply)) {
+	if (ast_sorcery_object_register(sorcery, "transport", transport_alloc, NULL, transport_apply)) {
 		return -1;
 	}
 
@@ -803,6 +835,7 @@
 	ast_sorcery_object_field_register_custom(sorcery, "transport", "tos", "0", transport_tos_handler, tos_to_str, NULL, 0, 0);
 	ast_sorcery_object_field_register(sorcery, "transport", "cos", "0", OPT_UINT_T, 0, FLDSET(struct ast_sip_transport, cos));
 	ast_sorcery_object_field_register(sorcery, "transport", "websocket_write_timeout", AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT_STR, OPT_INT_T, PARSE_IN_RANGE, FLDSET(struct ast_sip_transport, write_timeout), 1, INT_MAX);
+	ast_sorcery_object_field_register(sorcery, "transport", "allow_reload", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_transport, allow_reload));
 
 	internal_sip_register_endpoint_formatter(&endpoint_transport_formatter);
 
diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c
index a2fefde..693ce65 100644
--- a/res/res_pjsip_outbound_registration.c
+++ b/res/res_pjsip_outbound_registration.c
@@ -508,6 +508,7 @@
 {
 	pj_status_t status;
 	int *callback_invoked;
+	pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
 
 	callback_invoked = ast_threadstorage_get(&register_callback_invoked, sizeof(int));
 	if (!callback_invoked) {
@@ -517,6 +518,12 @@
 
 	/* Due to the message going out the callback may now be invoked, so bump the count */
 	ao2_ref(client_state, +1);
+	/*
+	 * Set the transport in case transports were reloaded.
+	 * When pjproject removes the extraneous error messages produced,
+	 * we can check status and only set the transport and resend if there was an error
+	 */
+	pjsip_regc_set_transport(client_state->client, &selector);
 	status = pjsip_regc_send(client_state->client, tdata);
 
 	/* If the attempt to send the message failed and the callback was not invoked we need to
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index c914641..8bca788 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -1558,6 +1558,26 @@
 	return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
 }
 
+/*!
+ * \internal
+ * \brief Wrapper for pjsip_evsub_send_request
+ *
+ * This function (re)sets the transport before sending to catch cases
+ * where the transport might have changed.
+ *
+ * If pjproject gives us the ability to resend, we'll only reset the transport
+ * if PJSIP_ETPNOTAVAIL is returned from send.
+ *
+ * \returns pj_status_t
+ */
+static pj_status_t internal_pjsip_evsub_send_request( struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
+{
+	pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
+
+	pjsip_dlg_set_transport(sub_tree->dlg, &selector);
+	return pjsip_evsub_send_request(sub_tree->evsub, tdata);
+}
+
 /* XXX This function is not used. */
 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
 		struct ast_sip_endpoint *endpoint, const char *resource)
@@ -1605,7 +1625,7 @@
 	evsub = sub_tree->evsub;
 
 	if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
-		pjsip_evsub_send_request(evsub, tdata);
+		internal_pjsip_evsub_send_request(sub_tree, tdata);
 	} else {
 		/* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
 		 * being called and terminating the subscription. Therefore, we don't
@@ -1684,8 +1704,8 @@
 {
 #ifdef TEST_FRAMEWORK
 	struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
-#endif
 	pjsip_evsub *evsub = sub_tree->evsub;
+#endif
 	int res;
 
 	if (allocate_tdata_buffer(tdata)) {
@@ -1693,7 +1713,8 @@
 		return -1;
 	}
 
-	res = pjsip_evsub_send_request(evsub, tdata) == PJ_SUCCESS ? 0 : -1;
+	res = internal_pjsip_evsub_send_request(sub_tree, tdata);
+
 	subscription_persistence_update(sub_tree, NULL);
 
 	ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
@@ -1702,7 +1723,7 @@
 		pjsip_evsub_get_state_name(evsub),
 		ast_sorcery_object_get_id(endpoint));
 
-	return res;
+	return (res == PJ_SUCCESS ? 0 : -1);
 }
 
 /*!
diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c
index 4f5071f..deb96cf 100644
--- a/res/res_pjsip_session.c
+++ b/res/res_pjsip_session.c
@@ -888,10 +888,30 @@
 	return 0;
 }
 
+/*!
+ * \internal
+ * \brief Wrapper for pjsip_inv_send_msg
+ *
+ * This function (re)sets the transport before sending to catch cases
+ * where the transport might have changed.
+ *
+ * If pjproject gives us the ability to resend, we'll only reset the transport
+ * if PJSIP_ETPNOTAVAIL is returned from send.
+ *
+ * \returns pj_status_t
+ */
+static pj_status_t internal_pjsip_inv_send_msg(pjsip_inv_session *inv, pjsip_tx_data *tdata)
+{
+	pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
+
+	pjsip_dlg_set_transport(inv->dlg, &selector);
+	return pjsip_inv_send_msg(inv, tdata);
+}
+
 void ast_sip_session_send_response(struct ast_sip_session *session, pjsip_tx_data *tdata)
 {
 	handle_outgoing_response(session, tdata);
-	pjsip_inv_send_msg(session->inv_session, tdata);
+	internal_pjsip_inv_send_msg(session->inv_session, tdata);
 	return;
 }
 
@@ -1087,7 +1107,8 @@
 	}
 
 	handle_outgoing_request(session, tdata);
-	pjsip_inv_send_msg(session->inv_session, tdata);
+	internal_pjsip_inv_send_msg(session->inv_session, tdata);
+
 	return;
 }
 
@@ -1852,7 +1873,7 @@
 		if (pjsip_inv_initial_answer(inv_session, rdata, 500, NULL, NULL, &tdata) != PJ_SUCCESS) {
 			pjsip_inv_terminate(inv_session, 500, PJ_FALSE);
 		}
-		pjsip_inv_send_msg(inv_session, tdata);
+		internal_pjsip_inv_send_msg(inv_session, tdata);
 		return NULL;
 	}
 	return inv_session;
@@ -2005,7 +2026,7 @@
 		if (pjsip_inv_initial_answer(inv_session, rdata, 500, NULL, NULL, &tdata) == PJ_SUCCESS) {
 			pjsip_inv_terminate(inv_session, 500, PJ_FALSE);
 		} else {
-			pjsip_inv_send_msg(inv_session, tdata);
+			internal_pjsip_inv_send_msg(inv_session, tdata);
 		}
 		return;
 	}
@@ -2015,7 +2036,7 @@
 		if (pjsip_inv_initial_answer(inv_session, rdata, 500, NULL, NULL, &tdata) == PJ_SUCCESS) {
 			pjsip_inv_terminate(inv_session, 500, PJ_FALSE);
 		} else {
-			pjsip_inv_send_msg(inv_session, tdata);
+			internal_pjsip_inv_send_msg(inv_session, tdata);
 		}
 		ao2_cleanup(invite);
 	}

-- 
To view, visit https://gerrit.asterisk.org/2125
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5e759850e25958117d4c02f62ceb7244d7ec9edf
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: George Joseph <george.joseph at fairview5.com>



More information about the asterisk-code-review mailing list