<p>George Joseph has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/8083">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">res_pjsip_pubsub: Prune subs with reliable transports at startup<br><br>In an earlier release, inbound registrations on a reliable transport<br>were pruned on Asterisk restart since the TCP connection would have<br>been torn down and become unusable when Asterisk stopped. This same<br>process is now also applied to inbound subscriptions.<br><br>To accomplish this, the pjsip_transport_event feature needed to<br>be refactored to allow multiple monitors (multiple subcriptions or<br>registrations from the same endpoint) to exist on the same transport.<br>Since this changed the API, any external modules that may have used the<br>transport monitor feature (highly unlikey) will need to be changed.<br><br>ASTERISK-27612<br>Reported by: Ross Beer<br><br>Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36<br>---<br>M CHANGES<br>M UPGRADE.txt<br>A contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>M include/asterisk/res_pjsip.h<br>M res/res_pjsip.c<br>M res/res_pjsip/include/res_pjsip_private.h<br>M res/res_pjsip/pjsip_transport_events.c<br>M res/res_pjsip_outbound_registration.c<br>M res/res_pjsip_pubsub.c<br>M res/res_pjsip_registrar.c<br>M res/res_pjsip_registrar_expire.c<br>11 files changed, 354 insertions(+), 83 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/83/8083/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/CHANGES b/CHANGES<br>index 2f6c448..6b3c8cd 100644<br>--- a/CHANGES<br>+++ b/CHANGES<br>@@ -31,6 +31,10 @@<br> identifier method split into the "ip" and "header" endpoint identifier<br> methods.<br> <br>+ * The pjsip_transport_event feature introduced in 15.1.0 has been refactored.<br>+ Any external modules that may have used that feature (highly unlikey) will<br>+ need to be changed as the API has been altered slightly.<br>+<br> res_pjsip_endpoint_identifier_ip<br> ------------------<br> * The endpoint identifier "ip" method previously recognized endpoints either<br>@@ -49,6 +53,17 @@<br> ------------------<br> * Removed the unused and incomplete SDP processing modules.<br> <br>+res_pjsip_pubsub<br>+------------------<br>+ * In an earlier release, inbound registrations on a reliable transport<br>+ were pruned on Asterisk restart since the TCP connection would have<br>+ been torn down and become unusable when Asterisk stopped. This same<br>+ process is now also applied to inbound subscriptions. Sicne this<br>+ required the addition of a new column to the ps_subscription_persistence<br>+ realtime table, users who store their subscriptions in a database will<br>+ need to run the "alembic upgrade head" process to add teh column to<br>+ the schema. <br>+<br> ------------------------------------------------------------------------------<br> --- Functionality changes from Asterisk 15.1.0 to Asterisk 15.2.0 ------------<br> ------------------------------------------------------------------------------<br>diff --git a/UPGRADE.txt b/UPGRADE.txt<br>index d47bbe3..3a1858b 100644<br>--- a/UPGRADE.txt<br>+++ b/UPGRADE.txt<br>@@ -32,6 +32,10 @@<br> identifier method split into the "ip" and "header" endpoint identifier<br> methods.<br> <br>+ * The pjsip_transport_event feature introduced in 15.1.0 has been refactored.<br>+ Any external modules that may have used that feature (highly unlikey) will<br>+ need to be changed as the API has been altered slightly.<br>+<br> res_pjsip_endpoint_identifier_ip<br> ------------------<br> * The endpoint identifier "ip" method previously recognized endpoints either<br>diff --git a/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>new file mode 100644<br>index 0000000..b0bbe13<br>--- /dev/null<br>+++ b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>@@ -0,0 +1,33 @@<br>+"""add prune_on_restart to ps_subscription_persistence<br>+<br>+Revision ID: d3e4284f8707<br>+Revises: 52798ad97bdf<br>+Create Date: 2018-01-28 17:45:36.218123<br>+<br>+"""<br>+<br>+# revision identifiers, used by Alembic.<br>+revision = 'd3e4284f8707'<br>+down_revision = '52798ad97bdf'<br>+<br>+from alembic import op<br>+import sqlalchemy as sa<br>+from sqlalchemy.dialects.postgresql import ENUM<br>+<br>+YESNO_NAME = 'yesno_values'<br>+YESNO_VALUES = ['yes', 'no']<br>+<br>+<br>+def upgrade():<br>+ ############################# Enums ##############################<br>+<br>+ # yesno_values have already been created, so use postgres enum object<br>+ # type to get around "already created" issue - works okay with mysql<br>+ yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)<br>+<br>+ op.add_column('ps_subscription_persistence', sa.Column('prune_on_restart', yesno_values))<br>+<br>+def downgrade():<br>+ if op.get_context().bind.dialect.name == 'mssql':<br>+ op.drop_constraint('ck_ps_subscription_persistence_prune_on_restart_yesno_values','ps_subscription_persistence')<br>+ op.drop_column('ps_subscription_persistence', 'prune_on_restart')<br>diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h<br>index 330e49f..a1b71a8 100644<br>--- a/include/asterisk/res_pjsip.h<br>+++ b/include/asterisk/res_pjsip.h<br>@@ -2992,6 +2992,18 @@<br> */<br> typedef void (*ast_transport_monitor_shutdown_cb)(void *data);<br> <br>+/*!<br>+ * \brief Transport shutdown monitor data matcher<br>+ * \since 13.20.0<br>+ *<br>+ * \param a User data to compare.<br>+ * \param b User data to compare.<br>+ *<br>+ * \retval 1 The data objects match<br>+ * \retval 0 The data objects don't match<br>+ */<br>+typedef int (*ast_transport_monitor_data_matcher)(void *a, void *b);<br>+<br> enum ast_transport_monitor_reg {<br> /*! \brief Successfully registered the transport monitor */<br> AST_TRANSPORT_MONITOR_REG_SUCCESS,<br>@@ -3006,39 +3018,71 @@<br> AST_TRANSPORT_MONITOR_REG_FAILED,<br> };<br> <br>+enum ast_transport_monitor_id {<br>+ /*! \brief Unknown */<br>+ AST_TRANSPORT_MONITOR_ID_UNKNOWN,<br>+ /*! \brief Dynamic contact registrations */<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br>+ /*! \brief Outbound registrations */<br>+ AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+ /*! \brief Inbound Subscriptions. */<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION,<br>+};<br>+<br> /*!<br> * \brief Register a reliable transport shutdown monitor callback.<br>- * \since 13.18.0<br>+ * \since 13.20.0<br> *<br> * \param transport Transport to monitor for shutdown.<br> * \param cb Who to call when transport is shutdown.<br>+ * \param id, ID used for matching unregisters.<br> * \param ao2_data Data to pass with the callback.<br>+ *<br>+ * \note The data object passed will have its reference count automatically<br>+ * incremented by this call and automatically decremented after the callback<br>+ * runs or when the callback is unregistered.<br>+ *<br>+ * There is no checking for duplicate registrations.<br> *<br> * \return enum ast_transport_monitor_reg<br> */<br> enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,<br>- ast_transport_monitor_shutdown_cb cb, void *ao2_data);<br>+ enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data);<br> <br> /*!<br>- * \brief Unregister a reliable transport shutdown monitor callback.<br>- * \since 13.18.0<br>+ * \brief Unregister a reliable transport shutdown monitor<br>+ * \since 13.20.0<br> *<br> * \param transport Transport to monitor for shutdown.<br>- * \param cb Who to call when transport is shutdown.<br>+ * \param id ast_sip_transport_monitor_id to match.<br>+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.<br>+ * \param matches Matcher function that returns true if data matches the previously<br>+ * registered data object. If NULL, a simple pointer comparison is done.<br>+ *<br>+ * \note The data object passed into the original register will have its reference count<br>+ * automatically decremeneted.<br> *<br> * \return Nothing<br> */<br>-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb);<br>+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,<br>+ enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches);<br> <br> /*!<br>- * \brief Unregister monitor callback from all reliable transports.<br>- * \since 13.18.0<br>+ * \brief Unregister a transport shutdown monitor from all reliable transports<br>+ * \since 13.20.0<br> *<br>- * \param cb Who to call when a transport is shutdown.<br>+ * \param id ast_sip_transport_monitor_id to match.<br>+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.<br>+ * \param matches Matcher function that returns true if ao2_data matches the previously<br>+ * registered data object. If NULL, a simple pointer comparison is done.<br>+ *<br>+ * \note The data object passed into the original register will have its reference count<br>+ * automatically decremeneted.<br> *<br> * \return Nothing<br> */<br>-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb);<br>+void ast_sip_transport_monitor_unregister_all( enum ast_transport_monitor_id id,<br>+ void *data, ast_transport_monitor_data_matcher matches);<br> <br> /*! Transport state notification registration element. */<br> struct ast_sip_tpmgr_state_callback {<br>diff --git a/res/res_pjsip.c b/res/res_pjsip.c<br>index 23bf713..dc14088 100644<br>--- a/res/res_pjsip.c<br>+++ b/res/res_pjsip.c<br>@@ -2943,6 +2943,50 @@<br> return ast_pjsip_endpoint;<br> }<br> <br>+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata)<br>+{<br>+ struct ast_sip_endpoint *endpoint = ast_pjsip_rdata_get_endpoint(rdata);<br>+ pj_str_t host_name;<br>+ int result = 1;<br>+<br>+ if (!endpoint) {<br>+ return 1;<br>+ }<br>+<br>+ /* Determine if the contact cannot survive a restart/boot. */<br>+ if (uri->port == rdata->pkt_info.src_port<br>+ && !pj_strcmp(&uri->host,<br>+ pj_cstr(&host_name, rdata->pkt_info.src_name))<br>+ /* We have already checked if the URI scheme is sip: or sips: */<br>+ && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {<br>+ pj_str_t type_name;<br>+<br>+ /* Determine the transport parameter value */<br>+ if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {<br>+ /* WSS is special, as it needs to be ws. */<br>+ pj_cstr(&type_name, "ws");<br>+ } else {<br>+ pj_cstr(&type_name, rdata->tp_info.transport->type_name);<br>+ }<br>+<br>+ if (!pj_stricmp(&uri->transport_param, &type_name)<br>+ && (endpoint->nat.rewrite_contact<br>+ /* Websockets are always rewritten */<br>+ || !pj_stricmp(&uri->transport_param,<br>+ pj_cstr(&type_name, "ws")))) {<br>+ /*<br>+ * The contact was rewritten to the reliable transport's<br>+ * source address. Disconnecting the transport for any<br>+ * reason invalidates the contact.<br>+ */<br>+ result = 0;<br>+ }<br>+ }<br>+<br>+ ao2_cleanup(endpoint);<br>+ return result;<br>+}<br>+<br> int ast_sip_get_transport_name(const struct ast_sip_endpoint *endpoint,<br> pjsip_sip_uri *sip_uri, char *buf, size_t buf_len)<br> {<br>diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h<br>index 5ce3c6f..1691505 100644<br>--- a/res/res_pjsip/include/res_pjsip_private.h<br>+++ b/res/res_pjsip/include/res_pjsip_private.h<br>@@ -395,4 +395,16 @@<br> */<br> int ast_sip_destroy_scheduler(void);<br> <br>+/*!<br>+ * \internal<br>+ * \brief Determines if a uri will still be valid after an asterisk restart<br>+ * \since 15.3.0<br>+ *<br>+ * \param uri uri to test<br>+ * \param rdata the rdata to get transport and endpoint information from<br>+ *<br>+ * \retval 1 Yes, 0 No<br>+ */<br>+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata);<br>+<br> #endif /* RES_PJSIP_PRIVATE_H_ */<br>diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c<br>index 0f57303..5c9c664 100644<br>--- a/res/res_pjsip/pjsip_transport_events.c<br>+++ b/res/res_pjsip/pjsip_transport_events.c<br>@@ -41,6 +41,8 @@<br> <br> /*! Who to notify when transport shuts down. */<br> struct transport_monitor_notifier {<br>+ /*! An opaque ID for matching unregisters. */<br>+ enum ast_transport_monitor_id id;<br> /*! Who to call when transport shuts down. */<br> ast_transport_monitor_shutdown_cb cb;<br> /*! ao2 data object to pass to callback. */<br>@@ -135,7 +137,7 @@<br> break;<br> }<br> monitored->transport = transport;<br>- if (AST_VECTOR_INIT(&monitored->monitors, 2)) {<br>+ if (AST_VECTOR_INIT(&monitored->monitors, 5)) {<br> ao2_ref(monitored, -1);<br> break;<br> }<br>@@ -166,6 +168,8 @@<br> struct transport_monitor_notifier *notifier;<br> <br> notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>+ ast_debug(3, "running callback %p(%d,%p) for transport %s\n",<br>+ notifier->cb, notifier->id, notifier->data, transport->obj_name);<br> notifier->cb(notifier->data);<br> }<br> ao2_ref(monitored, -1);<br>@@ -195,42 +199,64 @@<br> }<br> }<br> <br>-static int transport_monitor_unregister_all(void *obj, void *arg, int flags)<br>+struct callback_data {<br>+ enum ast_transport_monitor_id id;<br>+ void *data;<br>+ ast_transport_monitor_data_matcher matches;<br>+};<br>+<br>+static int transport_monitor_unregister_cb(void *obj, void *arg, int flags)<br> {<br> struct transport_monitor *monitored = obj;<br>- ast_transport_monitor_shutdown_cb cb = arg;<br>+ struct callback_data *cb = arg;<br> int idx;<br> <br> for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br> struct transport_monitor_notifier *notifier;<br> <br> notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>- if (notifier->cb == cb) {<br>+ if (notifier->id == cb->id && (!cb->data || cb->matches(cb->data, notifier->data))) {<br> ao2_cleanup(notifier->data);<br> AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);<br>- break;<br>+ ast_debug(3, "Unregistered monitor %p(%d,%p) from transport %s\n",<br>+ notifier->cb, notifier->id, notifier->data, monitored->transport->obj_name);<br> }<br> }<br> return 0;<br> }<br> <br>-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb)<br>+static int ptr_matcher(void *a, void *b)<br>+{<br>+ return a == b;<br>+}<br>+<br>+void ast_sip_transport_monitor_unregister_all(enum ast_transport_monitor_id id,<br>+ void *data, ast_transport_monitor_data_matcher matches)<br> {<br> struct ao2_container *transports;<br>+ struct callback_data cb = {<br>+ .id = id,<br>+ .data = data,<br>+ .matches = matches ?: ptr_matcher,<br>+ };<br>+<br>+ ast_assert(id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);<br> <br> transports = ao2_global_obj_ref(active_transports);<br> if (!transports) {<br> return;<br> }<br>- ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all,<br>- cb);<br>+ ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_cb, &cb);<br> ao2_ref(transports, -1);<br> }<br> <br>-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb)<br>+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,<br>+ enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches)<br> {<br> struct ao2_container *transports;<br> struct transport_monitor *monitored;<br>+<br>+ ast_assert(transport && id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);<br> <br> transports = ao2_global_obj_ref(active_transports);<br> if (!transports) {<br>@@ -240,18 +266,13 @@<br> ao2_lock(transports);<br> monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);<br> if (monitored) {<br>- int idx;<br>+ struct callback_data cb = {<br>+ .id = id,<br>+ .data = data,<br>+ .matches = matches ?: ptr_matcher,<br>+ };<br> <br>- for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br>- struct transport_monitor_notifier *notifier;<br>-<br>- notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>- if (notifier->cb == cb) {<br>- ao2_cleanup(notifier->data);<br>- AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);<br>- break;<br>- }<br>- }<br>+ transport_monitor_unregister_cb(monitored, &cb, 0);<br> ao2_ref(monitored, -1);<br> }<br> ao2_unlock(transports);<br>@@ -259,7 +280,7 @@<br> }<br> <br> enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,<br>- ast_transport_monitor_shutdown_cb cb, void *ao2_data)<br>+ enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data)<br> {<br> struct ao2_container *transports;<br> struct transport_monitor *monitored;<br>@@ -273,31 +294,23 @@<br> ao2_lock(transports);<br> monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);<br> if (monitored) {<br>- int idx;<br> struct transport_monitor_notifier new_monitor;<br>-<br>- /* Check if the callback monitor already exists */<br>- for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br>- struct transport_monitor_notifier *notifier;<br>-<br>- notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>- if (notifier->cb == cb) {<br>- /* The monitor is already in the vector replace with new ao2_data. */<br>- ao2_replace(notifier->data, ao2_data);<br>- res = AST_TRANSPORT_MONITOR_REG_REPLACED;<br>- goto register_done;<br>- }<br>- }<br> <br> /* Add new monitor to vector */<br> new_monitor.cb = cb;<br>+ new_monitor.id = id;<br> new_monitor.data = ao2_bump(ao2_data);<br> if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) {<br> ao2_cleanup(ao2_data);<br> res = AST_TRANSPORT_MONITOR_REG_FAILED;<br>+ ast_debug(3, "Register monitor %p(%d,%p) to transport %s FAILED\n",<br>+ cb, id, ao2_data, transport->obj_name);<br>+ } else {<br>+ res = AST_TRANSPORT_MONITOR_REG_SUCCESS;<br>+ ast_debug(3, "Registered monitor %p(%d,%p) to transport %s\n",<br>+ cb, id, ao2_data, transport->obj_name);<br> }<br> <br>-register_done:<br> ao2_ref(monitored, -1);<br> }<br> ao2_unlock(transports);<br>diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c<br>index d9afcd2..bff3fd7 100644<br>--- a/res/res_pjsip_outbound_registration.c<br>+++ b/res/res_pjsip_outbound_registration.c<br>@@ -850,6 +850,14 @@<br> }<br> }<br> <br>+static int monitor_matcher(void *a, void *b)<br>+{<br>+ char *ma = a;<br>+ char *mb = b;<br>+<br>+ return strcmp(ma, mb) == 0;<br>+}<br>+<br> static void registration_transport_monitor_setup(pjsip_transport *transport, const char *registration_name)<br> {<br> char *monitor;<br>@@ -869,8 +877,8 @@<br> * register the monitor. We might get into a message spamming infinite<br> * loop of registration, shutdown, reregistration...<br> */<br>- ast_sip_transport_monitor_register(transport, registration_transport_shutdown_cb,<br>- monitor);<br>+ ast_sip_transport_monitor_register(transport, AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+ registration_transport_shutdown_cb, monitor);<br> ao2_ref(monitor, -1);<br> }<br> <br>@@ -950,7 +958,8 @@<br> ast_debug(1, "Outbound unregistration to '%s' with client '%s' successful\n", server_uri, client_uri);<br> update_client_state_status(response->client_state, SIP_REGISTRATION_UNREGISTERED);<br> ast_sip_transport_monitor_unregister(response->rdata->tp_info.transport,<br>- registration_transport_shutdown_cb);<br>+ AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION, response->client_state->registration_name,<br>+ monitor_matcher);<br> }<br> } else if (response->client_state->destroy) {<br> /* We need to deal with the pending destruction instead. */<br>@@ -2149,7 +2158,8 @@<br> <br> ao2_global_obj_release(current_states);<br> <br>- ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb);<br>+ ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+ NULL, NULL);<br> <br> /* Wait for registration serializers to get destroyed. */<br> ast_debug(2, "Waiting for registration transactions to complete for unload.\n");<br>diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c<br>index 1f24de0..c5447cd 100644<br>--- a/res/res_pjsip_pubsub.c<br>+++ b/res/res_pjsip_pubsub.c<br>@@ -127,6 +127,11 @@<br> <configOption name="contact_uri"><br> <synopsis>The Contact URI of the dialog for the subscription</synopsis><br> </configOption><br>+ <configOption name="prune_on_restart"><br>+ <synopsis>If set, indicates that the contact used a reliable transport<br>+ and therefore the subscription must be deleted after an asterisk restart<br>+ </synopsis><br>+ </configOption><br> </configObject><br> <configObject name="resource_list"><br> <synopsis>Resource list configuration parameters.</synopsis><br>@@ -382,6 +387,8 @@<br> struct timeval expires;<br> /*! Contact URI */<br> char contact_uri[PJSIP_MAX_URL_SIZE];<br>+ /*! Prune subscription on restart */<br>+ int prune_on_restart;<br> };<br> <br> /*!<br>@@ -446,6 +453,10 @@<br> * capable of restarting the timer.<br> */<br> struct ast_sip_sched_task *expiration_task;<br>+ /*! The transport the subscription was received on.<br>+ * Only used for reliable transports.<br>+ */<br>+ pjsip_transport *transport;<br> };<br> <br> /*!<br>@@ -549,6 +560,17 @@<br> return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);<br> }<br> <br>+static void sub_tree_transport_cb(void *data) {<br>+ struct sip_subscription_tree *sub_tree = data;<br>+<br>+ ast_debug(3, "Transport destroyed. Removing subscription '%s->%s' prune on restart: %d\n",<br>+ sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+ sub_tree->persistence->prune_on_restart);<br>+<br>+ sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;<br>+ pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);<br>+}<br>+<br> /*! \brief Destructor for subscription persistence */<br> static void subscription_persistence_destroy(void *obj)<br> {<br>@@ -599,8 +621,9 @@<br> return;<br> }<br> <br>- ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,<br>- sub_tree->root->resource);<br>+ ast_debug(3, "Updating persistence for '%s->%s' prune on restart: %s\n",<br>+ sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+ sub_tree->persistence->prune_on_restart ? "yes" : "no");<br> <br> dlg = sub_tree->dlg;<br> sub_tree->persistence->cseq = dlg->local.cseq;<br>@@ -614,6 +637,25 @@<br> sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));<br> <br> if (contact_hdr) {<br>+ if (contact_hdr) {<br>+ if (type == SUBSCRIPTION_PERSISTENCE_CREATED) {<br>+ sub_tree->persistence->prune_on_restart =<br>+ !ast_sip_will_uri_survive_restart(<br>+ (pjsip_sip_uri *)pjsip_uri_get_uri(contact_hdr->uri), rdata);<br>+<br>+ if (sub_tree->persistence->prune_on_restart) {<br>+ ast_debug(3, "adding transport monitor on %s for '%s->%s' prune on restart: %d\n",<br>+ rdata->tp_info.transport->obj_name,<br>+ sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+ sub_tree->persistence->prune_on_restart);<br>+ sub_tree->transport = rdata->tp_info.transport;<br>+ ast_sip_transport_monitor_register(rdata->tp_info.transport,<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree_transport_cb, sub_tree);<br>+ // N.B. ast_sip_transport_monitor_register holds a reference to the sub_tree<br>+ }<br>+ }<br>+ }<br>+<br> pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,<br> sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));<br> } else {<br>@@ -654,6 +696,15 @@<br> {<br> if (!sub_tree->persistence) {<br> return;<br>+ }<br>+<br>+ if (sub_tree->persistence->prune_on_restart && sub_tree->transport) {<br>+ ast_debug(3, "Unregistering transport monitor on %s '%s->%s'\n",<br>+ sub_tree->transport->obj_name,<br>+ sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",<br>+ sub_tree->root ? sub_tree->root->resource : "Unknown");<br>+ ast_sip_transport_monitor_unregister(sub_tree->transport,<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree, NULL);<br> }<br> <br> ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);<br>@@ -1563,6 +1614,14 @@<br> struct ast_taskprocessor *serializer;<br> pjsip_rx_data rdata;<br> struct persistence_recreate_data recreate_data;<br>+<br>+ /* If this subscription used a reliable transport it can't be reestablished so remove it */<br>+ if (persistence->prune_on_restart) {<br>+ ast_debug(3, "Deleting subscription marked as 'prune' from persistent store '%s' %s\n",<br>+ persistence->endpoint, persistence->tag);<br>+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);<br>+ return 0;<br>+ }<br> <br> /* If this subscription has already expired remove it */<br> if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {<br>@@ -2924,6 +2983,7 @@<br> ind->expires = -1;<br> <br> sub_tree->persistence = subscription_persistence_create(sub_tree);<br>+<br> subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);<br> sip_subscription_accept(sub_tree, rdata, resp);<br> if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {<br>@@ -5423,6 +5483,8 @@<br> persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);<br> ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0,<br> CHARFLDSET(struct subscription_persistence, contact_uri));<br>+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "prune_on_restart", "0", OPT_UINT_T, 0,<br>+ FLDSET(struct subscription_persistence, prune_on_restart));<br> <br> if (apply_list_configuration(sorcery)) {<br> ast_sched_context_destroy(sched);<br>@@ -5499,6 +5561,8 @@<br> AST_TEST_UNREGISTER(loop);<br> AST_TEST_UNREGISTER(bad_event);<br> <br>+ ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, NULL, NULL);<br>+<br> ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));<br> <br> ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);<br>diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c<br>index f0da6de..d53b256 100644<br>--- a/res/res_pjsip_registrar.c<br>+++ b/res/res_pjsip_registrar.c<br>@@ -327,6 +327,15 @@<br> char aor_name[0];<br> };<br> <br>+static int contact_transport_monitor_matcher(void *a, void *b)<br>+{<br>+ struct contact_transport_monitor *ma = a;<br>+ struct contact_transport_monitor *mb = b;<br>+<br>+ return strcmp(ma->aor_name, mb->aor_name) == 0<br>+ && strcmp(ma->contact_name, mb->contact_name) == 0;<br>+}<br>+<br> static void register_contact_transport_shutdown_cb(void *data)<br> {<br> struct contact_transport_monitor *monitor = data;<br>@@ -579,7 +588,6 @@<br> contact = ao2_callback(contacts, OBJ_UNLINK, registrar_find_contact, &details);<br> if (!contact) {<br> int prune_on_boot = 0;<br>- pj_str_t host_name;<br> <br> /* If they are actually trying to delete a contact that does not exist... be forgiving */<br> if (!expiration) {<br>@@ -588,35 +596,7 @@<br> continue;<br> }<br> <br>- /* Determine if the contact cannot survive a restart/boot. */<br>- if (details.uri->port == rdata->pkt_info.src_port<br>- && !pj_strcmp(&details.uri->host,<br>- pj_cstr(&host_name, rdata->pkt_info.src_name))<br>- /* We have already checked if the URI scheme is sip: or sips: */<br>- && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {<br>- pj_str_t type_name;<br>-<br>- /* Determine the transport parameter value */<br>- if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {<br>- /* WSS is special, as it needs to be ws. */<br>- pj_cstr(&type_name, "ws");<br>- } else {<br>- pj_cstr(&type_name, rdata->tp_info.transport->type_name);<br>- }<br>-<br>- if (!pj_stricmp(&details.uri->transport_param, &type_name)<br>- && (endpoint->nat.rewrite_contact<br>- /* Websockets are always rewritten */<br>- || !pj_stricmp(&details.uri->transport_param,<br>- pj_cstr(&type_name, "ws")))) {<br>- /*<br>- * The contact was rewritten to the reliable transport's<br>- * source address. Disconnecting the transport for any<br>- * reason invalidates the contact.<br>- */<br>- prune_on_boot = 1;<br>- }<br>- }<br>+ prune_on_boot = !ast_sip_will_uri_survive_restart(details.uri, rdata);<br> <br> contact = ast_sip_location_create_contact(aor, contact_uri,<br> ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)),<br>@@ -645,6 +625,7 @@<br> strcpy(monitor->contact_name, contact_name);/* Safe */<br> <br> ast_sip_transport_monitor_register(rdata->tp_info.transport,<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br> register_contact_transport_shutdown_cb, monitor);<br> ao2_ref(monitor, -1);<br> }<br>@@ -703,6 +684,21 @@<br> contact_update->user_agent);<br> ao2_cleanup(contact_update);<br> } else {<br>+ if (contact->prune_on_boot) {<br>+ struct contact_transport_monitor *monitor;<br>+ const char *contact_name =<br>+ ast_sorcery_object_get_id(contact);<br>+<br>+ monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(aor_name)<br>+ + strlen(contact_name));<br>+ strcpy(monitor->aor_name, aor_name);/* Safe */<br>+ monitor->contact_name = monitor->aor_name + strlen(aor_name) + 1;<br>+ strcpy(monitor->contact_name, contact_name);/* Safe */<br>+<br>+ ast_sip_transport_monitor_unregister(rdata->tp_info.transport,<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, monitor, contact_transport_monitor_matcher);<br>+ }<br>+<br> /* We want to report the user agent that was actually in the removed contact */<br> ast_sip_location_delete_contact(contact);<br> ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name);<br>@@ -1125,7 +1121,7 @@<br> ast_manager_unregister(AMI_SHOW_REGISTRATIONS);<br> ast_manager_unregister(AMI_SHOW_REGISTRATION_CONTACT_STATUSES);<br> ast_sip_unregister_service(®istrar_module);<br>- ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb);<br>+ ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, NULL, NULL);<br> return 0;<br> }<br> <br>diff --git a/res/res_pjsip_registrar_expire.c b/res/res_pjsip_registrar_expire.c<br>index fe4a60d..61a9071 100644<br>--- a/res/res_pjsip_registrar_expire.c<br>+++ b/res/res_pjsip_registrar_expire.c<br>@@ -32,6 +32,27 @@<br> #include "asterisk/module.h"<br> #include "asterisk/named_locks.h"<br> <br>+// These are common with res_pjsip_registrar!<br>+/*! Transport monitor for incoming REGISTER contacts */<br>+struct contact_transport_monitor {<br>+ /*!<br>+ * \brief Sorcery contact name to remove on transport shutdown<br>+ * \note Stored after aor_name in space reserved when struct allocated.<br>+ */<br>+ char *contact_name;<br>+ /*! AOR name the contact is associated */<br>+ char aor_name[0];<br>+};<br>+<br>+static int contact_transport_monitor_matcher(void *a, void *b)<br>+{<br>+ struct contact_transport_monitor *ma = a;<br>+ struct contact_transport_monitor *mb = b;<br>+<br>+ return strcmp(ma->aor_name, mb->aor_name) == 0<br>+ && strcmp(ma->contact_name, mb->contact_name) == 0;<br>+}<br>+<br> /*! \brief Thread keeping things alive */<br> static pthread_t check_thread = AST_PTHREADT_NULL;<br> <br>@@ -55,6 +76,21 @@<br> */<br> ao2_lock(lock);<br> if (ast_tvdiff_ms(ast_tvnow(), contact->expiration_time) > 0) {<br>+ if (contact->prune_on_boot) {<br>+ struct contact_transport_monitor *monitor;<br>+ const char *contact_name =<br>+ ast_sorcery_object_get_id(contact);<br>+<br>+ monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(contact->aor)<br>+ + strlen(contact_name));<br>+ strcpy(monitor->aor_name, contact->aor);/* Safe */<br>+ monitor->contact_name = monitor->aor_name + strlen(contact->aor) + 1;<br>+ strcpy(monitor->contact_name, contact_name);/* Safe */<br>+<br>+ // We don't have a pointer to transport here so we remove from all transports.<br>+ ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br>+ monitor, contact_transport_monitor_matcher);<br>+ }<br> ast_sip_location_delete_contact(contact);<br> }<br> ao2_unlock(lock);<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/8083">change 8083</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/8083"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 15 </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36 </div>
<div style="display:none"> Gerrit-Change-Number: 8083 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: George Joseph <gjoseph@digium.com> </div>