[Asterisk-code-review] res pjsip pubsub: Prune subs with reliable transports at sta... (asterisk[13])

George Joseph asteriskteam at digium.com
Mon Jan 29 06:52:23 CST 2018


George Joseph has uploaded this change for review. ( https://gerrit.asterisk.org/8082


Change subject: res_pjsip_pubsub: Prune subs with reliable transports at startup
......................................................................

res_pjsip_pubsub: Prune subs with reliable transports at startup

In an earlier release, inbound registrations on a reliable transport
were pruned on Asterisk restart since the TCP connection would have
been torn down and become unusable when Asterisk stopped.  This same
process is now also applied to inbound subscriptions.

To accomplish this, the pjsip_transport_event feature needed to
be refactored to allow multiple monitors (multiple subcriptions or
registrations from the same endpoint) to exist on the same transport.
Since this changed the API, any external modules that may have used the
transport monitor feature (highly unlikey) will need to be changed.

ASTERISK-27612
Reported by: Ross Beer

Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36
---
M CHANGES
M UPGRADE.txt
A contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py
M include/asterisk/res_pjsip.h
M res/res_pjsip.c
M res/res_pjsip/include/res_pjsip_private.h
M res/res_pjsip/pjsip_transport_events.c
M res/res_pjsip_outbound_registration.c
M res/res_pjsip_pubsub.c
M res/res_pjsip_registrar.c
M res/res_pjsip_registrar_expire.c
11 files changed, 355 insertions(+), 83 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/82/8082/1

diff --git a/CHANGES b/CHANGES
index 6e58928..cf5fbcf 100644
--- a/CHANGES
+++ b/CHANGES
@@ -30,6 +30,10 @@
    identifier method split into the "ip" and "header" endpoint identifier
    methods.
 
+ * The pjsip_transport_event feature introduced in 13.18.0 has been refactored.
+   Any external modules that may have used that feature (highly unlikey) will
+   need to be changed as the API has been altered slightly.
+
 res_pjsip_endpoint_identifier_ip
 ------------------
  * The endpoint identifier "ip" method previously recognized endpoints either
@@ -44,6 +48,17 @@
    you can now predict which endpoint is matched when a request comes in that
    matches both.
 
+res_pjsip_pubsub
+------------------
+ * In an earlier release, inbound registrations on a reliable transport
+   were pruned on Asterisk restart since the TCP connection would have
+   been torn down and become unusable when Asterisk stopped.  This same
+   process is now also applied to inbound subscriptions.  Sicne this
+   required the addition of a new column to the ps_subscription_persistence
+   realtime table, users who store their subscriptions in a database will
+   need to run the "alembic upgrade head" process to add teh column to
+   the schema. 
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.18.0 to Asterisk 13.19.0 ----------
 ------------------------------------------------------------------------------
diff --git a/UPGRADE.txt b/UPGRADE.txt
index ff246dd..18200f9 100644
--- a/UPGRADE.txt
+++ b/UPGRADE.txt
@@ -30,6 +30,10 @@
    identifier method split into the "ip" and "header" endpoint identifier
    methods.
 
+ * The pjsip_transport_event feature introduced in 13.18.0 has been refactored.
+   Any external modules that may have used that feature (highly unlikey) will
+   need to be changed as the API has been altered slightly.
+
 res_pjsip_endpoint_identifier_ip
 ------------------
  * The endpoint identifier "ip" method previously recognized endpoints either
@@ -44,6 +48,7 @@
    you can now predict which endpoint is matched when a request comes in that
    matches both.
 
+
 From 13.17.0 to 13.18.0:
 
 Core:
diff --git a/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py
new file mode 100644
index 0000000..b0bbe13
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py
@@ -0,0 +1,33 @@
+"""add prune_on_restart to ps_subscription_persistence
+
+Revision ID: d3e4284f8707
+Revises: 52798ad97bdf
+Create Date: 2018-01-28 17:45:36.218123
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'd3e4284f8707'
+down_revision = '52798ad97bdf'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+YESNO_NAME = 'yesno_values'
+YESNO_VALUES = ['yes', 'no']
+
+
+def upgrade():
+    ############################# Enums ##############################
+
+    # yesno_values have already been created, so use postgres enum object
+    # type to get around "already created" issue - works okay with mysql
+    yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)
+
+    op.add_column('ps_subscription_persistence', sa.Column('prune_on_restart', yesno_values))
+
+def downgrade():
+    if op.get_context().bind.dialect.name == 'mssql':
+        op.drop_constraint('ck_ps_subscription_persistence_prune_on_restart_yesno_values','ps_subscription_persistence')
+    op.drop_column('ps_subscription_persistence', 'prune_on_restart')
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 2d3609b..ac1ac34 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -3009,6 +3009,18 @@
  */
 typedef void (*ast_transport_monitor_shutdown_cb)(void *data);
 
+/*!
+ * \brief Transport shutdown monitor data matcher
+ * \since 13.20.0
+ *
+ * \param a User data to compare.
+ * \param b User data to compare.
+ *
+ * \retval 1 The data objects match
+ * \retval 0 The data objects don't match
+ */
+typedef int (*ast_transport_monitor_data_matcher)(void *a, void *b);
+
 enum ast_transport_monitor_reg {
 	/*! \brief Successfully registered the transport monitor */
 	AST_TRANSPORT_MONITOR_REG_SUCCESS,
@@ -3023,39 +3035,71 @@
 	AST_TRANSPORT_MONITOR_REG_FAILED,
 };
 
+enum ast_transport_monitor_id {
+	/*! \brief Unknown */
+	AST_TRANSPORT_MONITOR_ID_UNKNOWN,
+	/*! \brief Dynamic contact registrations */
+	AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,
+	/*! \brief Outbound registrations */
+	AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,
+	/*! \brief Inbound Subscriptions. */
+	AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION,
+};
+
 /*!
  * \brief Register a reliable transport shutdown monitor callback.
- * \since 13.18.0
+ * \since 13.20.0
  *
  * \param transport Transport to monitor for shutdown.
  * \param cb Who to call when transport is shutdown.
+ * \param id, ID used for matching unregisters.
  * \param ao2_data Data to pass with the callback.
+ *
+ * \note The data object passed will have its reference count automatically
+ * incremented by this call and automatically decremented after the callback
+ * runs or when the callback is unregistered.
+ *
+ * There is no checking for duplicate registrations.
  *
  * \return enum ast_transport_monitor_reg
  */
 enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,
-	ast_transport_monitor_shutdown_cb cb, void *ao2_data);
+	enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data);
 
 /*!
- * \brief Unregister a reliable transport shutdown monitor callback.
- * \since 13.18.0
+ * \brief Unregister a reliable transport shutdown monitor
+ * \since 13.20.0
  *
  * \param transport Transport to monitor for shutdown.
- * \param cb Who to call when transport is shutdown.
+ * \param id ast_sip_transport_monitor_id to match.
+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.
+ * \param matches Matcher function that returns true if data matches the previously
+ *                registered data object.  If NULL, a simple pointer comparison is done.
+ *
+ * \note The data object passed into the original register will have its reference count
+ * automatically decremeneted.
  *
  * \return Nothing
  */
-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb);
+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,
+	enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches);
 
 /*!
- * \brief Unregister monitor callback from all reliable transports.
- * \since 13.18.0
+ * \brief Unregister a transport shutdown monitor from all reliable transports
+ * \since 13.20.0
  *
- * \param cb Who to call when a transport is shutdown.
+ * \param id ast_sip_transport_monitor_id to match.
+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.
+ * \param matches Matcher function that returns true if ao2_data matches the previously
+ *                registered data object.  If NULL, a simple pointer comparison is done.
+ *
+ * \note The data object passed into the original register will have its reference count
+ * automatically decremeneted.
  *
  * \return Nothing
  */
-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb);
+void ast_sip_transport_monitor_unregister_all(	enum ast_transport_monitor_id id,
+	void *data, ast_transport_monitor_data_matcher matches);
 
 /*! Transport state notification registration element.  */
 struct ast_sip_tpmgr_state_callback {
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 05e10d3..013e359 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -2894,6 +2894,50 @@
 	return ast_pjsip_endpoint;
 }
 
+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata)
+{
+	struct ast_sip_endpoint *endpoint = ast_pjsip_rdata_get_endpoint(rdata);
+	pj_str_t host_name;
+	int result = 1;
+
+	if (!endpoint) {
+		return 1;
+	}
+
+	/* Determine if the contact cannot survive a restart/boot. */
+	if (uri->port == rdata->pkt_info.src_port
+		&& !pj_strcmp(&uri->host,
+			pj_cstr(&host_name, rdata->pkt_info.src_name))
+		/* We have already checked if the URI scheme is sip: or sips: */
+		&& PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {
+		pj_str_t type_name;
+
+		/* Determine the transport parameter value */
+		if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {
+			/* WSS is special, as it needs to be ws. */
+			pj_cstr(&type_name, "ws");
+		} else {
+			pj_cstr(&type_name, rdata->tp_info.transport->type_name);
+		}
+
+		if (!pj_stricmp(&uri->transport_param, &type_name)
+			&& (endpoint->nat.rewrite_contact
+				/* Websockets are always rewritten */
+				|| !pj_stricmp(&uri->transport_param,
+					pj_cstr(&type_name, "ws")))) {
+			/*
+			 * The contact was rewritten to the reliable transport's
+			 * source address.  Disconnecting the transport for any
+			 * reason invalidates the contact.
+			 */
+			result = 0;
+		}
+	}
+
+	ao2_cleanup(endpoint);
+	return result;
+}
+
 int ast_sip_get_transport_name(const struct ast_sip_endpoint *endpoint,
 	pjsip_sip_uri *sip_uri, char *buf, size_t buf_len)
 {
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 151f598..9bada9d 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -419,4 +419,16 @@
  */
 void internal_res_pjsip_unref(void);
 
+/*!
+ * \internal
+ * \brief Determines if a uri will still be valid after an asterisk restart
+ * \since 13.20.0
+ *
+ * \param uri uri to test
+ * \param rdata the rdata to get transport and endpoint information from
+ *
+ * \retval 1 Yes, 0 No
+ */
+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata);
+
 #endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c
index 0f57303..5c9c664 100644
--- a/res/res_pjsip/pjsip_transport_events.c
+++ b/res/res_pjsip/pjsip_transport_events.c
@@ -41,6 +41,8 @@
 
 /*! Who to notify when transport shuts down. */
 struct transport_monitor_notifier {
+	/*! An opaque ID for matching unregisters. */
+	enum ast_transport_monitor_id id;
 	/*! Who to call when transport shuts down. */
 	ast_transport_monitor_shutdown_cb cb;
 	/*! ao2 data object to pass to callback. */
@@ -135,7 +137,7 @@
 				break;
 			}
 			monitored->transport = transport;
-			if (AST_VECTOR_INIT(&monitored->monitors, 2)) {
+			if (AST_VECTOR_INIT(&monitored->monitors, 5)) {
 				ao2_ref(monitored, -1);
 				break;
 			}
@@ -166,6 +168,8 @@
 					struct transport_monitor_notifier *notifier;
 
 					notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+					ast_debug(3, "running callback %p(%d,%p) for transport %s\n",
+						notifier->cb, notifier->id, notifier->data, transport->obj_name);
 					notifier->cb(notifier->data);
 				}
 				ao2_ref(monitored, -1);
@@ -195,42 +199,64 @@
 	}
 }
 
-static int transport_monitor_unregister_all(void *obj, void *arg, int flags)
+struct callback_data {
+	enum ast_transport_monitor_id id;
+	void *data;
+	ast_transport_monitor_data_matcher matches;
+};
+
+static int transport_monitor_unregister_cb(void *obj, void *arg, int flags)
 {
 	struct transport_monitor *monitored = obj;
-	ast_transport_monitor_shutdown_cb cb = arg;
+	struct callback_data *cb = arg;
 	int idx;
 
 	for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
 		struct transport_monitor_notifier *notifier;
 
 		notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
-		if (notifier->cb == cb) {
+		if (notifier->id == cb->id && (!cb->data || cb->matches(cb->data, notifier->data))) {
 			ao2_cleanup(notifier->data);
 			AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
-			break;
+			ast_debug(3, "Unregistered monitor %p(%d,%p) from transport %s\n",
+				notifier->cb, notifier->id, notifier->data, monitored->transport->obj_name);
 		}
 	}
 	return 0;
 }
 
-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb)
+static int ptr_matcher(void *a, void *b)
+{
+	return a == b;
+}
+
+void ast_sip_transport_monitor_unregister_all(enum ast_transport_monitor_id id,
+	void *data, ast_transport_monitor_data_matcher matches)
 {
 	struct ao2_container *transports;
+	struct callback_data cb = {
+		.id = id,
+		.data = data,
+		.matches = matches ?: ptr_matcher,
+	};
+
+	ast_assert(id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);
 
 	transports = ao2_global_obj_ref(active_transports);
 	if (!transports) {
 		return;
 	}
-	ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all,
-		cb);
+	ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_cb, &cb);
 	ao2_ref(transports, -1);
 }
 
-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb)
+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,
+	enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches)
 {
 	struct ao2_container *transports;
 	struct transport_monitor *monitored;
+
+	ast_assert(transport && id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);
 
 	transports = ao2_global_obj_ref(active_transports);
 	if (!transports) {
@@ -240,18 +266,13 @@
 	ao2_lock(transports);
 	monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 	if (monitored) {
-		int idx;
+		struct callback_data cb = {
+			.id = id,
+			.data = data,
+			.matches = matches ?: ptr_matcher,
+		};
 
-		for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
-			struct transport_monitor_notifier *notifier;
-
-			notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
-			if (notifier->cb == cb) {
-				ao2_cleanup(notifier->data);
-				AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
-				break;
-			}
-		}
+		transport_monitor_unregister_cb(monitored, &cb, 0);
 		ao2_ref(monitored, -1);
 	}
 	ao2_unlock(transports);
@@ -259,7 +280,7 @@
 }
 
 enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,
-	ast_transport_monitor_shutdown_cb cb, void *ao2_data)
+	enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data)
 {
 	struct ao2_container *transports;
 	struct transport_monitor *monitored;
@@ -273,31 +294,23 @@
 	ao2_lock(transports);
 	monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 	if (monitored) {
-		int idx;
 		struct transport_monitor_notifier new_monitor;
-
-		/* Check if the callback monitor already exists */
-		for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
-			struct transport_monitor_notifier *notifier;
-
-			notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
-			if (notifier->cb == cb) {
-				/* The monitor is already in the vector replace with new ao2_data. */
-				ao2_replace(notifier->data, ao2_data);
-				res = AST_TRANSPORT_MONITOR_REG_REPLACED;
-				goto register_done;
-			}
-		}
 
 		/* Add new monitor to vector */
 		new_monitor.cb = cb;
+		new_monitor.id = id;
 		new_monitor.data = ao2_bump(ao2_data);
 		if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) {
 			ao2_cleanup(ao2_data);
 			res = AST_TRANSPORT_MONITOR_REG_FAILED;
+			ast_debug(3, "Register monitor %p(%d,%p) to transport %s FAILED\n",
+				cb, id, ao2_data, transport->obj_name);
+		} else {
+			res = AST_TRANSPORT_MONITOR_REG_SUCCESS;
+			ast_debug(3, "Registered monitor %p(%d,%p) to transport %s\n",
+				cb, id, ao2_data, transport->obj_name);
 		}
 
-register_done:
 		ao2_ref(monitored, -1);
 	}
 	ao2_unlock(transports);
diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c
index 0b177ae..8122084 100644
--- a/res/res_pjsip_outbound_registration.c
+++ b/res/res_pjsip_outbound_registration.c
@@ -851,6 +851,14 @@
 	}
 }
 
+static int monitor_matcher(void *a, void *b)
+{
+	char *ma = a;
+	char *mb = b;
+
+	return strcmp(ma, mb) == 0;
+}
+
 static void registration_transport_monitor_setup(pjsip_transport *transport, const char *registration_name)
 {
 	char *monitor;
@@ -870,8 +878,8 @@
 	 * register the monitor.  We might get into a message spamming infinite
 	 * loop of registration, shutdown, reregistration...
 	 */
-	ast_sip_transport_monitor_register(transport, registration_transport_shutdown_cb,
-		monitor);
+	ast_sip_transport_monitor_register(transport, AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,
+		registration_transport_shutdown_cb, monitor);
 	ao2_ref(monitor, -1);
 }
 
@@ -951,7 +959,8 @@
 			ast_debug(1, "Outbound unregistration to '%s' with client '%s' successful\n", server_uri, client_uri);
 			update_client_state_status(response->client_state, SIP_REGISTRATION_UNREGISTERED);
 			ast_sip_transport_monitor_unregister(response->rdata->tp_info.transport,
-				registration_transport_shutdown_cb);
+				AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION, response->client_state->registration_name,
+				monitor_matcher);
 		}
 	} else if (response->client_state->destroy) {
 		/* We need to deal with the pending destruction instead. */
@@ -2150,7 +2159,8 @@
 
 	ao2_global_obj_release(current_states);
 
-	ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb);
+	ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,
+		NULL, NULL);
 
 	/* Wait for registration serializers to get destroyed. */
 	ast_debug(2, "Waiting for registration transactions to complete for unload.\n");
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index 88005b8..c00da80 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -127,6 +127,11 @@
 				<configOption name="contact_uri">
 					<synopsis>The Contact URI of the dialog for the subscription</synopsis>
 				</configOption>
+				<configOption name="prune_on_restart">
+					<synopsis>If set, indicates that the contact used a reliable transport
+					and therefore the subscription must be deleted after an asterisk restart
+					</synopsis>
+				</configOption>
 			</configObject>
 			<configObject name="resource_list">
 				<synopsis>Resource list configuration parameters.</synopsis>
@@ -382,6 +387,8 @@
 	struct timeval expires;
 	/*! Contact URI */
 	char contact_uri[PJSIP_MAX_URL_SIZE];
+	/*! Prune subscription on restart */
+	int prune_on_restart;
 };
 
 /*!
@@ -446,6 +453,10 @@
 	 * capable of restarting the timer.
 	 */
 	struct ast_sip_sched_task *expiration_task;
+	/*! The transport the subscription was received on.
+	 * Only used for reliable transports.
+	 */
+	pjsip_transport *transport;
 };
 
 /*!
@@ -549,6 +560,17 @@
 	return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
 }
 
+static void sub_tree_transport_cb(void *data) {
+	struct sip_subscription_tree *sub_tree = data;
+
+	ast_debug(3, "Transport destroyed.  Removing subscription '%s->%s'  prune on restart: %d\n",
+		sub_tree->persistence->endpoint, sub_tree->root->resource,
+		sub_tree->persistence->prune_on_restart);
+
+	sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
+	pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+}
+
 /*! \brief Destructor for subscription persistence */
 static void subscription_persistence_destroy(void *obj)
 {
@@ -599,8 +621,9 @@
 		return;
 	}
 
-	ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
-		sub_tree->root->resource);
+	ast_debug(3, "Updating persistence for '%s->%s'  prune on restart: %s\n",
+		sub_tree->persistence->endpoint, sub_tree->root->resource,
+		sub_tree->persistence->prune_on_restart ? "yes" : "no");
 
 	dlg = sub_tree->dlg;
 	sub_tree->persistence->cseq = dlg->local.cseq;
@@ -614,6 +637,25 @@
 		sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
 
 		if (contact_hdr) {
+			if (contact_hdr) {
+				if (type == SUBSCRIPTION_PERSISTENCE_CREATED) {
+					sub_tree->persistence->prune_on_restart =
+						!ast_sip_will_uri_survive_restart(
+							(pjsip_sip_uri *)pjsip_uri_get_uri(contact_hdr->uri), rdata);
+
+					if (sub_tree->persistence->prune_on_restart) {
+						ast_debug(3, "adding transport monitor on %s for '%s->%s'  prune on restart: %d\n",
+							rdata->tp_info.transport->obj_name,
+							sub_tree->persistence->endpoint, sub_tree->root->resource,
+							sub_tree->persistence->prune_on_restart);
+						sub_tree->transport = rdata->tp_info.transport;
+						ast_sip_transport_monitor_register(rdata->tp_info.transport,
+							AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree_transport_cb, sub_tree);
+						// N.B.  ast_sip_transport_monitor_register holds a reference to the sub_tree
+					}
+				}
+			}
+
 			pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
 					sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
 		} else {
@@ -654,6 +696,15 @@
 {
 	if (!sub_tree->persistence) {
 		return;
+	}
+
+	if (sub_tree->persistence->prune_on_restart && sub_tree->transport) {
+		ast_debug(3, "Unregistering transport monitor on %s '%s->%s'\n",
+			sub_tree->transport->obj_name,
+			sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
+			sub_tree->root ? sub_tree->root->resource : "Unknown");
+		ast_sip_transport_monitor_unregister(sub_tree->transport,
+			AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree, NULL);
 	}
 
 	ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
@@ -1585,6 +1636,14 @@
 	struct ast_taskprocessor *serializer;
 	pjsip_rx_data rdata;
 	struct persistence_recreate_data recreate_data;
+
+	/* If this subscription used a reliable transport it can't be reestablished so remove it */
+	if (persistence->prune_on_restart) {
+		ast_debug(3, "Deleting subscription marked as 'prune' from persistent store '%s' %s\n",
+			persistence->endpoint, persistence->tag);
+		ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+		return 0;
+	}
 
 	/* If this subscription has already expired remove it */
 	if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
@@ -2989,6 +3048,7 @@
 		ind->expires = -1;
 
 		sub_tree->persistence = subscription_persistence_create(sub_tree);
+
 		subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
 		sip_subscription_accept(sub_tree, rdata, resp);
 		if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
@@ -5483,6 +5543,8 @@
 		persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
 	ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0,
 		CHARFLDSET(struct subscription_persistence, contact_uri));
+	ast_sorcery_object_field_register(sorcery, "subscription_persistence", "prune_on_restart", "0", OPT_UINT_T, 0,
+		FLDSET(struct subscription_persistence, prune_on_restart));
 
 	if (apply_list_configuration(sorcery)) {
 		ast_sched_context_destroy(sched);
@@ -5559,6 +5621,8 @@
 	AST_TEST_UNREGISTER(loop);
 	AST_TEST_UNREGISTER(bad_event);
 
+	ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, NULL, NULL);
+
 	ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
 
 	ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c
index cdf045f..2cdd87c 100644
--- a/res/res_pjsip_registrar.c
+++ b/res/res_pjsip_registrar.c
@@ -327,6 +327,15 @@
 	char aor_name[0];
 };
 
+static int contact_transport_monitor_matcher(void *a, void *b)
+{
+	struct contact_transport_monitor *ma = a;
+	struct contact_transport_monitor *mb = b;
+
+	return strcmp(ma->aor_name, mb->aor_name) == 0
+		&& strcmp(ma->contact_name, mb->contact_name) == 0;
+}
+
 static void register_contact_transport_shutdown_cb(void *data)
 {
 	struct contact_transport_monitor *monitor = data;
@@ -579,7 +588,6 @@
 		contact = ao2_callback(contacts, OBJ_UNLINK, registrar_find_contact, &details);
 		if (!contact) {
 			int prune_on_boot = 0;
-			pj_str_t host_name;
 
 			/* If they are actually trying to delete a contact that does not exist... be forgiving */
 			if (!expiration) {
@@ -588,35 +596,7 @@
 				continue;
 			}
 
-			/* Determine if the contact cannot survive a restart/boot. */
-			if (details.uri->port == rdata->pkt_info.src_port
-				&& !pj_strcmp(&details.uri->host,
-					pj_cstr(&host_name, rdata->pkt_info.src_name))
-				/* We have already checked if the URI scheme is sip: or sips: */
-				&& PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {
-				pj_str_t type_name;
-
-				/* Determine the transport parameter value */
-				if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {
-					/* WSS is special, as it needs to be ws. */
-					pj_cstr(&type_name, "ws");
-				} else {
-					pj_cstr(&type_name, rdata->tp_info.transport->type_name);
-				}
-
-				if (!pj_stricmp(&details.uri->transport_param, &type_name)
-					&& (endpoint->nat.rewrite_contact
-						/* Websockets are always rewritten */
-						|| !pj_stricmp(&details.uri->transport_param,
-							pj_cstr(&type_name, "ws")))) {
-					/*
-					 * The contact was rewritten to the reliable transport's
-					 * source address.  Disconnecting the transport for any
-					 * reason invalidates the contact.
-					 */
-					prune_on_boot = 1;
-				}
-			}
+			prune_on_boot = !ast_sip_will_uri_survive_restart(details.uri, rdata);
 
 			contact = ast_sip_location_create_contact(aor, contact_uri,
 				ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)),
@@ -645,6 +625,7 @@
 					strcpy(monitor->contact_name, contact_name);/* Safe */
 
 					ast_sip_transport_monitor_register(rdata->tp_info.transport,
+						AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,
 						register_contact_transport_shutdown_cb, monitor);
 					ao2_ref(monitor, -1);
 				}
@@ -703,6 +684,21 @@
 					contact_update->user_agent);
 			ao2_cleanup(contact_update);
 		} else {
+			if (contact->prune_on_boot) {
+				struct contact_transport_monitor *monitor;
+				const char *contact_name =
+					ast_sorcery_object_get_id(contact);
+
+				monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(aor_name)
+					+ strlen(contact_name));
+				strcpy(monitor->aor_name, aor_name);/* Safe */
+				monitor->contact_name = monitor->aor_name + strlen(aor_name) + 1;
+				strcpy(monitor->contact_name, contact_name);/* Safe */
+
+				ast_sip_transport_monitor_unregister(rdata->tp_info.transport,
+					AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, monitor, contact_transport_monitor_matcher);
+			}
+
 			/* We want to report the user agent that was actually in the removed contact */
 			ast_sip_location_delete_contact(contact);
 			ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name);
@@ -1135,7 +1131,7 @@
 	ast_manager_unregister(AMI_SHOW_REGISTRATIONS);
 	ast_manager_unregister(AMI_SHOW_REGISTRATION_CONTACT_STATUSES);
 	ast_sip_unregister_service(&registrar_module);
-	ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb);
+	ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, NULL, NULL);
 	return 0;
 }
 
diff --git a/res/res_pjsip_registrar_expire.c b/res/res_pjsip_registrar_expire.c
index e802733..0a29baf 100644
--- a/res/res_pjsip_registrar_expire.c
+++ b/res/res_pjsip_registrar_expire.c
@@ -32,6 +32,27 @@
 #include "asterisk/module.h"
 #include "asterisk/named_locks.h"
 
+//  These are common with res_pjsip_registrar!
+/*! Transport monitor for incoming REGISTER contacts */
+struct contact_transport_monitor {
+	/*!
+	 * \brief Sorcery contact name to remove on transport shutdown
+	 * \note Stored after aor_name in space reserved when struct allocated.
+	 */
+	char *contact_name;
+	/*! AOR name the contact is associated */
+	char aor_name[0];
+};
+
+static int contact_transport_monitor_matcher(void *a, void *b)
+{
+	struct contact_transport_monitor *ma = a;
+	struct contact_transport_monitor *mb = b;
+
+	return strcmp(ma->aor_name, mb->aor_name) == 0
+		&& strcmp(ma->contact_name, mb->contact_name) == 0;
+}
+
 /*! \brief Thread keeping things alive */
 static pthread_t check_thread = AST_PTHREADT_NULL;
 
@@ -55,6 +76,21 @@
 	 */
 	ao2_lock(lock);
 	if (ast_tvdiff_ms(ast_tvnow(), contact->expiration_time) > 0) {
+		if (contact->prune_on_boot) {
+			struct contact_transport_monitor *monitor;
+			const char *contact_name =
+				ast_sorcery_object_get_id(contact);
+
+			monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(contact->aor)
+				+ strlen(contact_name));
+			strcpy(monitor->aor_name, contact->aor);/* Safe */
+			monitor->contact_name = monitor->aor_name + strlen(contact->aor) + 1;
+			strcpy(monitor->contact_name, contact_name);/* Safe */
+
+			// We don't have a pointer to transport here so we remove from all transports.
+			ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,
+				monitor, contact_transport_monitor_matcher);
+		}
 		ast_sip_location_delete_contact(contact);
 	}
 	ao2_unlock(lock);

-- 
To view, visit https://gerrit.asterisk.org/8082
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36
Gerrit-Change-Number: 8082
Gerrit-PatchSet: 1
Gerrit-Owner: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180129/ee00b6eb/attachment-0001.html>


More information about the asterisk-code-review mailing list