<p>George Joseph has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/8083">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">res_pjsip_pubsub: Prune subs with reliable transports at startup<br><br>In an earlier release, inbound registrations on a reliable transport<br>were pruned on Asterisk restart since the TCP connection would have<br>been torn down and become unusable when Asterisk stopped.  This same<br>process is now also applied to inbound subscriptions.<br><br>To accomplish this, the pjsip_transport_event feature needed to<br>be refactored to allow multiple monitors (multiple subcriptions or<br>registrations from the same endpoint) to exist on the same transport.<br>Since this changed the API, any external modules that may have used the<br>transport monitor feature (highly unlikey) will need to be changed.<br><br>ASTERISK-27612<br>Reported by: Ross Beer<br><br>Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36<br>---<br>M CHANGES<br>M UPGRADE.txt<br>A contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>M include/asterisk/res_pjsip.h<br>M res/res_pjsip.c<br>M res/res_pjsip/include/res_pjsip_private.h<br>M res/res_pjsip/pjsip_transport_events.c<br>M res/res_pjsip_outbound_registration.c<br>M res/res_pjsip_pubsub.c<br>M res/res_pjsip_registrar.c<br>M res/res_pjsip_registrar_expire.c<br>11 files changed, 354 insertions(+), 83 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/83/8083/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">diff --git a/CHANGES b/CHANGES<br>index 2f6c448..6b3c8cd 100644<br>--- a/CHANGES<br>+++ b/CHANGES<br>@@ -31,6 +31,10 @@<br>    identifier method split into the "ip" and "header" endpoint identifier<br>    methods.<br> <br>+ * The pjsip_transport_event feature introduced in 15.1.0 has been refactored.<br>+   Any external modules that may have used that feature (highly unlikey) will<br>+   need to be changed as the API has been altered slightly.<br>+<br> res_pjsip_endpoint_identifier_ip<br> ------------------<br>  * The endpoint identifier "ip" method previously recognized endpoints either<br>@@ -49,6 +53,17 @@<br> ------------------<br>  * Removed the unused and incomplete SDP processing modules.<br> <br>+res_pjsip_pubsub<br>+------------------<br>+ * In an earlier release, inbound registrations on a reliable transport<br>+   were pruned on Asterisk restart since the TCP connection would have<br>+   been torn down and become unusable when Asterisk stopped.  This same<br>+   process is now also applied to inbound subscriptions.  Sicne this<br>+   required the addition of a new column to the ps_subscription_persistence<br>+   realtime table, users who store their subscriptions in a database will<br>+   need to run the "alembic upgrade head" process to add teh column to<br>+   the schema. <br>+<br> ------------------------------------------------------------------------------<br> --- Functionality changes from Asterisk 15.1.0 to Asterisk 15.2.0 ------------<br> ------------------------------------------------------------------------------<br>diff --git a/UPGRADE.txt b/UPGRADE.txt<br>index d47bbe3..3a1858b 100644<br>--- a/UPGRADE.txt<br>+++ b/UPGRADE.txt<br>@@ -32,6 +32,10 @@<br>    identifier method split into the "ip" and "header" endpoint identifier<br>    methods.<br> <br>+ * The pjsip_transport_event feature introduced in 15.1.0 has been refactored.<br>+   Any external modules that may have used that feature (highly unlikey) will<br>+   need to be changed as the API has been altered slightly.<br>+<br> res_pjsip_endpoint_identifier_ip<br> ------------------<br>  * The endpoint identifier "ip" method previously recognized endpoints either<br>diff --git a/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>new file mode 100644<br>index 0000000..b0bbe13<br>--- /dev/null<br>+++ b/contrib/ast-db-manage/config/versions/d3e4284f8707_add_prune_on_restart_to_ps_subscription_.py<br>@@ -0,0 +1,33 @@<br>+"""add prune_on_restart to ps_subscription_persistence<br>+<br>+Revision ID: d3e4284f8707<br>+Revises: 52798ad97bdf<br>+Create Date: 2018-01-28 17:45:36.218123<br>+<br>+"""<br>+<br>+# revision identifiers, used by Alembic.<br>+revision = 'd3e4284f8707'<br>+down_revision = '52798ad97bdf'<br>+<br>+from alembic import op<br>+import sqlalchemy as sa<br>+from sqlalchemy.dialects.postgresql import ENUM<br>+<br>+YESNO_NAME = 'yesno_values'<br>+YESNO_VALUES = ['yes', 'no']<br>+<br>+<br>+def upgrade():<br>+    ############################# Enums ##############################<br>+<br>+    # yesno_values have already been created, so use postgres enum object<br>+    # type to get around "already created" issue - works okay with mysql<br>+    yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)<br>+<br>+    op.add_column('ps_subscription_persistence', sa.Column('prune_on_restart', yesno_values))<br>+<br>+def downgrade():<br>+    if op.get_context().bind.dialect.name == 'mssql':<br>+        op.drop_constraint('ck_ps_subscription_persistence_prune_on_restart_yesno_values','ps_subscription_persistence')<br>+    op.drop_column('ps_subscription_persistence', 'prune_on_restart')<br>diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h<br>index 330e49f..a1b71a8 100644<br>--- a/include/asterisk/res_pjsip.h<br>+++ b/include/asterisk/res_pjsip.h<br>@@ -2992,6 +2992,18 @@<br>  */<br> typedef void (*ast_transport_monitor_shutdown_cb)(void *data);<br> <br>+/*!<br>+ * \brief Transport shutdown monitor data matcher<br>+ * \since 13.20.0<br>+ *<br>+ * \param a User data to compare.<br>+ * \param b User data to compare.<br>+ *<br>+ * \retval 1 The data objects match<br>+ * \retval 0 The data objects don't match<br>+ */<br>+typedef int (*ast_transport_monitor_data_matcher)(void *a, void *b);<br>+<br> enum ast_transport_monitor_reg {<br>        /*! \brief Successfully registered the transport monitor */<br>   AST_TRANSPORT_MONITOR_REG_SUCCESS,<br>@@ -3006,39 +3018,71 @@<br>   AST_TRANSPORT_MONITOR_REG_FAILED,<br> };<br> <br>+enum ast_transport_monitor_id {<br>+  /*! \brief Unknown */<br>+        AST_TRANSPORT_MONITOR_ID_UNKNOWN,<br>+    /*! \brief Dynamic contact registrations */<br>+  AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br>+       /*! \brief Outbound registrations */<br>+ AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+      /*! \brief Inbound Subscriptions. */<br>+ AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION,<br>+};<br>+<br> /*!<br>  * \brief Register a reliable transport shutdown monitor callback.<br>- * \since 13.18.0<br>+ * \since 13.20.0<br>  *<br>  * \param transport Transport to monitor for shutdown.<br>  * \param cb Who to call when transport is shutdown.<br>+ * \param id, ID used for matching unregisters.<br>  * \param ao2_data Data to pass with the callback.<br>+ *<br>+ * \note The data object passed will have its reference count automatically<br>+ * incremented by this call and automatically decremented after the callback<br>+ * runs or when the callback is unregistered.<br>+ *<br>+ * There is no checking for duplicate registrations.<br>  *<br>  * \return enum ast_transport_monitor_reg<br>  */<br> enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,<br>-   ast_transport_monitor_shutdown_cb cb, void *ao2_data);<br>+       enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data);<br> <br> /*!<br>- * \brief Unregister a reliable transport shutdown monitor callback.<br>- * \since 13.18.0<br>+ * \brief Unregister a reliable transport shutdown monitor<br>+ * \since 13.20.0<br>  *<br>  * \param transport Transport to monitor for shutdown.<br>- * \param cb Who to call when transport is shutdown.<br>+ * \param id ast_sip_transport_monitor_id to match.<br>+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.<br>+ * \param matches Matcher function that returns true if data matches the previously<br>+ *                registered data object.  If NULL, a simple pointer comparison is done.<br>+ *<br>+ * \note The data object passed into the original register will have its reference count<br>+ * automatically decremeneted.<br>  *<br>  * \return Nothing<br>  */<br>-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb);<br>+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,<br>+   enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches);<br> <br> /*!<br>- * \brief Unregister monitor callback from all reliable transports.<br>- * \since 13.18.0<br>+ * \brief Unregister a transport shutdown monitor from all reliable transports<br>+ * \since 13.20.0<br>  *<br>- * \param cb Who to call when a transport is shutdown.<br>+ * \param id ast_sip_transport_monitor_id to match.<br>+ * \param data Data to pass to the matcher. May be NULL and does NOT need to be an ao2 object.<br>+ * \param matches Matcher function that returns true if ao2_data matches the previously<br>+ *                registered data object.  If NULL, a simple pointer comparison is done.<br>+ *<br>+ * \note The data object passed into the original register will have its reference count<br>+ * automatically decremeneted.<br>  *<br>  * \return Nothing<br>  */<br>-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb);<br>+void ast_sip_transport_monitor_unregister_all(   enum ast_transport_monitor_id id,<br>+    void *data, ast_transport_monitor_data_matcher matches);<br> <br> /*! Transport state notification registration element.  */<br> struct ast_sip_tpmgr_state_callback {<br>diff --git a/res/res_pjsip.c b/res/res_pjsip.c<br>index 23bf713..dc14088 100644<br>--- a/res/res_pjsip.c<br>+++ b/res/res_pjsip.c<br>@@ -2943,6 +2943,50 @@<br>         return ast_pjsip_endpoint;<br> }<br> <br>+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata)<br>+{<br>+       struct ast_sip_endpoint *endpoint = ast_pjsip_rdata_get_endpoint(rdata);<br>+     pj_str_t host_name;<br>+  int result = 1;<br>+<br>+   if (!endpoint) {<br>+             return 1;<br>+    }<br>+<br>+ /* Determine if the contact cannot survive a restart/boot. */<br>+        if (uri->port == rdata->pkt_info.src_port<br>+              && !pj_strcmp(&uri->host,<br>+                     pj_cstr(&host_name, rdata->pkt_info.src_name))<br>+                /* We have already checked if the URI scheme is sip: or sips: */<br>+             && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {<br>+               pj_str_t type_name;<br>+<br>+               /* Determine the transport parameter value */<br>+                if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {<br>+                       /* WSS is special, as it needs to be ws. */<br>+                  pj_cstr(&type_name, "ws");<br>+             } else {<br>+                     pj_cstr(&type_name, rdata->tp_info.transport->type_name);<br>+          }<br>+<br>+         if (!pj_stricmp(&uri->transport_param, &type_name)<br>+                        && (endpoint->nat.rewrite_contact<br>+                         /* Websockets are always rewritten */<br>+                                || !pj_stricmp(&uri->transport_param,<br>+                                 pj_cstr(&type_name, "ws")))) {<br>+                 /*<br>+                    * The contact was rewritten to the reliable transport's<br>+                  * source address.  Disconnecting the transport for any<br>+                       * reason invalidates the contact.<br>+                    */<br>+                  result = 0;<br>+          }<br>+    }<br>+<br>+ ao2_cleanup(endpoint);<br>+       return result;<br>+}<br>+<br> int ast_sip_get_transport_name(const struct ast_sip_endpoint *endpoint,<br>       pjsip_sip_uri *sip_uri, char *buf, size_t buf_len)<br> {<br>diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h<br>index 5ce3c6f..1691505 100644<br>--- a/res/res_pjsip/include/res_pjsip_private.h<br>+++ b/res/res_pjsip/include/res_pjsip_private.h<br>@@ -395,4 +395,16 @@<br>  */<br> int ast_sip_destroy_scheduler(void);<br> <br>+/*!<br>+ * \internal<br>+ * \brief Determines if a uri will still be valid after an asterisk restart<br>+ * \since 15.3.0<br>+ *<br>+ * \param uri uri to test<br>+ * \param rdata the rdata to get transport and endpoint information from<br>+ *<br>+ * \retval 1 Yes, 0 No<br>+ */<br>+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, pjsip_rx_data *rdata);<br>+<br> #endif /* RES_PJSIP_PRIVATE_H_ */<br>diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c<br>index 0f57303..5c9c664 100644<br>--- a/res/res_pjsip/pjsip_transport_events.c<br>+++ b/res/res_pjsip/pjsip_transport_events.c<br>@@ -41,6 +41,8 @@<br> <br> /*! Who to notify when transport shuts down. */<br> struct transport_monitor_notifier {<br>+      /*! An opaque ID for matching unregisters. */<br>+        enum ast_transport_monitor_id id;<br>     /*! Who to call when transport shuts down. */<br>         ast_transport_monitor_shutdown_cb cb;<br>         /*! ao2 data object to pass to callback. */<br>@@ -135,7 +137,7 @@<br>                              break;<br>                        }<br>                     monitored->transport = transport;<br>-                 if (AST_VECTOR_INIT(&monitored->monitors, 2)) {<br>+                       if (AST_VECTOR_INIT(&monitored->monitors, 5)) {<br>                                ao2_ref(monitored, -1);<br>                               break;<br>                        }<br>@@ -166,6 +168,8 @@<br>                                        struct transport_monitor_notifier *notifier;<br> <br>                                       notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>+                                    ast_debug(3, "running callback %p(%d,%p) for transport %s\n",<br>+                                              notifier->cb, notifier->id, notifier->data, transport->obj_name);<br>                                         notifier->cb(notifier->data);<br>                           }<br>                             ao2_ref(monitored, -1);<br>@@ -195,42 +199,64 @@<br>        }<br> }<br> <br>-static int transport_monitor_unregister_all(void *obj, void *arg, int flags)<br>+struct callback_data {<br>+     enum ast_transport_monitor_id id;<br>+    void *data;<br>+  ast_transport_monitor_data_matcher matches;<br>+};<br>+<br>+static int transport_monitor_unregister_cb(void *obj, void *arg, int flags)<br> {<br>         struct transport_monitor *monitored = obj;<br>-   ast_transport_monitor_shutdown_cb cb = arg;<br>+  struct callback_data *cb = arg;<br>       int idx;<br> <br>   for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br>            struct transport_monitor_notifier *notifier;<br> <br>               notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>-            if (notifier->cb == cb) {<br>+         if (notifier->id == cb->id && (!cb->data || cb->matches(cb->data, notifier->data))) {<br>                       ao2_cleanup(notifier->data);<br>                       AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);<br>-                       break;<br>+                       ast_debug(3, "Unregistered monitor %p(%d,%p) from transport %s\n",<br>+                         notifier->cb, notifier->id, notifier->data, monitored->transport->obj_name);<br>           }<br>     }<br>     return 0;<br> }<br> <br>-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb)<br>+static int ptr_matcher(void *a, void *b)<br>+{<br>+        return a == b;<br>+}<br>+<br>+void ast_sip_transport_monitor_unregister_all(enum ast_transport_monitor_id id,<br>+      void *data, ast_transport_monitor_data_matcher matches)<br> {<br>   struct ao2_container *transports;<br>+    struct callback_data cb = {<br>+          .id = id,<br>+            .data = data,<br>+                .matches = matches ?: ptr_matcher,<br>+   };<br>+<br>+        ast_assert(id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);<br> <br>        transports = ao2_global_obj_ref(active_transports);<br>   if (!transports) {<br>            return;<br>       }<br>-    ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all,<br>-                cb);<br>+ ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_cb, &cb);<br>        ao2_ref(transports, -1);<br> }<br> <br>-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb)<br>+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,<br>+        enum ast_transport_monitor_id id, void *data, ast_transport_monitor_data_matcher matches)<br> {<br>         struct ao2_container *transports;<br>     struct transport_monitor *monitored;<br>+<br>+      ast_assert(transport && id != AST_TRANSPORT_MONITOR_ID_UNKNOWN);<br> <br>   transports = ao2_global_obj_ref(active_transports);<br>   if (!transports) {<br>@@ -240,18 +266,13 @@<br>     ao2_lock(transports);<br>         monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);<br>        if (monitored) {<br>-             int idx;<br>+             struct callback_data cb = {<br>+                  .id = id,<br>+                    .data = data,<br>+                        .matches = matches ?: ptr_matcher,<br>+           };<br> <br>-                for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br>-                   struct transport_monitor_notifier *notifier;<br>-<br>-                      notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>-                    if (notifier->cb == cb) {<br>-                         ao2_cleanup(notifier->data);<br>-                              AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);<br>-                               break;<br>-                       }<br>-            }<br>+            transport_monitor_unregister_cb(monitored, &cb, 0);<br>               ao2_ref(monitored, -1);<br>       }<br>     ao2_unlock(transports);<br>@@ -259,7 +280,7 @@<br> }<br> <br> enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport,<br>-  ast_transport_monitor_shutdown_cb cb, void *ao2_data)<br>+        enum ast_transport_monitor_id id, ast_transport_monitor_shutdown_cb cb, void *ao2_data)<br> {<br>   struct ao2_container *transports;<br>     struct transport_monitor *monitored;<br>@@ -273,31 +294,23 @@<br>   ao2_lock(transports);<br>         monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);<br>        if (monitored) {<br>-             int idx;<br>              struct transport_monitor_notifier new_monitor;<br>-<br>-            /* Check if the callback monitor already exists */<br>-           for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {<br>-                   struct transport_monitor_notifier *notifier;<br>-<br>-                      notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);<br>-                    if (notifier->cb == cb) {<br>-                         /* The monitor is already in the vector replace with new ao2_data. */<br>-                                ao2_replace(notifier->data, ao2_data);<br>-                            res = AST_TRANSPORT_MONITOR_REG_REPLACED;<br>-                            goto register_done;<br>-                  }<br>-            }<br> <br>          /* Add new monitor to vector */<br>               new_monitor.cb = cb;<br>+         new_monitor.id = id;<br>          new_monitor.data = ao2_bump(ao2_data);<br>                if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) {<br>                    ao2_cleanup(ao2_data);<br>                        res = AST_TRANSPORT_MONITOR_REG_FAILED;<br>+                      ast_debug(3, "Register monitor %p(%d,%p) to transport %s FAILED\n",<br>+                                cb, id, ao2_data, transport->obj_name);<br>+           } else {<br>+                     res = AST_TRANSPORT_MONITOR_REG_SUCCESS;<br>+                     ast_debug(3, "Registered monitor %p(%d,%p) to transport %s\n",<br>+                             cb, id, ao2_data, transport->obj_name);<br>            }<br> <br>-register_done:<br>                 ao2_ref(monitored, -1);<br>       }<br>     ao2_unlock(transports);<br>diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c<br>index d9afcd2..bff3fd7 100644<br>--- a/res/res_pjsip_outbound_registration.c<br>+++ b/res/res_pjsip_outbound_registration.c<br>@@ -850,6 +850,14 @@<br>    }<br> }<br> <br>+static int monitor_matcher(void *a, void *b)<br>+{<br>+  char *ma = a;<br>+        char *mb = b;<br>+<br>+     return strcmp(ma, mb) == 0;<br>+}<br>+<br> static void registration_transport_monitor_setup(pjsip_transport *transport, const char *registration_name)<br> {<br>  char *monitor;<br>@@ -869,8 +877,8 @@<br>    * register the monitor.  We might get into a message spamming infinite<br>        * loop of registration, shutdown, reregistration...<br>   */<br>-  ast_sip_transport_monitor_register(transport, registration_transport_shutdown_cb,<br>-            monitor);<br>+    ast_sip_transport_monitor_register(transport, AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+                registration_transport_shutdown_cb, monitor);<br>         ao2_ref(monitor, -1);<br> }<br> <br>@@ -950,7 +958,8 @@<br>                     ast_debug(1, "Outbound unregistration to '%s' with client '%s' successful\n", server_uri, client_uri);<br>                      update_client_state_status(response->client_state, SIP_REGISTRATION_UNREGISTERED);<br>                         ast_sip_transport_monitor_unregister(response->rdata->tp_info.transport,<br>-                               registration_transport_shutdown_cb);<br>+                         AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION, response->client_state->registration_name,<br>+                             monitor_matcher);<br>             }<br>     } else if (response->client_state->destroy) {<br>           /* We need to deal with the pending destruction instead. */<br>@@ -2149,7 +2158,8 @@<br> <br>         ao2_global_obj_release(current_states);<br> <br>-   ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb);<br>+        ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_OUTBOUND_REGISTRATION,<br>+             NULL, NULL);<br> <br>       /* Wait for registration serializers to get destroyed. */<br>     ast_debug(2, "Waiting for registration transactions to complete for unload.\n");<br>diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c<br>index 1f24de0..c5447cd 100644<br>--- a/res/res_pjsip_pubsub.c<br>+++ b/res/res_pjsip_pubsub.c<br>@@ -127,6 +127,11 @@<br>                             <configOption name="contact_uri"><br>                                     <synopsis>The Contact URI of the dialog for the subscription</synopsis><br>                           </configOption><br>+                                <configOption name="prune_on_restart"><br>+                                       <synopsis>If set, indicates that the contact used a reliable transport<br>+                                 and therefore the subscription must be deleted after an asterisk restart<br>+                                     </synopsis><br>+                            </configOption><br>                         </configObject><br>                         <configObject name="resource_list"><br>                           <synopsis>Resource list configuration parameters.</synopsis><br>@@ -382,6 +387,8 @@<br>         struct timeval expires;<br>       /*! Contact URI */<br>    char contact_uri[PJSIP_MAX_URL_SIZE];<br>+        /*! Prune subscription on restart */<br>+ int prune_on_restart;<br> };<br> <br> /*!<br>@@ -446,6 +453,10 @@<br>      * capable of restarting the timer.<br>    */<br>   struct ast_sip_sched_task *expiration_task;<br>+  /*! The transport the subscription was received on.<br>+   * Only used for reliable transports.<br>+         */<br>+  pjsip_transport *transport;<br> };<br> <br> /*!<br>@@ -549,6 +560,17 @@<br>       return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);<br> }<br> <br>+static void sub_tree_transport_cb(void *data) {<br>+        struct sip_subscription_tree *sub_tree = data;<br>+<br>+    ast_debug(3, "Transport destroyed.  Removing subscription '%s->%s'  prune on restart: %d\n",<br>+            sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+               sub_tree->persistence->prune_on_restart);<br>+<br>+   sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;<br>+     pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);<br>+}<br>+<br> /*! \brief Destructor for subscription persistence */<br> static void subscription_persistence_destroy(void *obj)<br> {<br>@@ -599,8 +621,9 @@<br>                 return;<br>       }<br> <br>- ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,<br>-              sub_tree->root->resource);<br>+     ast_debug(3, "Updating persistence for '%s->%s'  prune on restart: %s\n",<br>+               sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+               sub_tree->persistence->prune_on_restart ? "yes" : "no");<br> <br>         dlg = sub_tree->dlg;<br>       sub_tree->persistence->cseq = dlg->local.cseq;<br>@@ -614,6 +637,25 @@<br>                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));<br> <br>            if (contact_hdr) {<br>+                   if (contact_hdr) {<br>+                           if (type == SUBSCRIPTION_PERSISTENCE_CREATED) {<br>+                                      sub_tree->persistence->prune_on_restart =<br>+                                              !ast_sip_will_uri_survive_restart(<br>+                                                   (pjsip_sip_uri *)pjsip_uri_get_uri(contact_hdr->uri), rdata);<br>+<br>+                                  if (sub_tree->persistence->prune_on_restart) {<br>+                                         ast_debug(3, "adding transport monitor on %s for '%s->%s'  prune on restart: %d\n",<br>+                                                     rdata->tp_info.transport->obj_name,<br>+                                                    sub_tree->persistence->endpoint, sub_tree->root->resource,<br>+                                                       sub_tree->persistence->prune_on_restart);<br>+                                              sub_tree->transport = rdata->tp_info.transport;<br>+                                                ast_sip_transport_monitor_register(rdata->tp_info.transport,<br>+                                                      AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree_transport_cb, sub_tree);<br>+                                             // N.B.  ast_sip_transport_monitor_register holds a reference to the sub_tree<br>+                                        }<br>+                            }<br>+                    }<br>+<br>                  pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,<br>                                        sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));<br>          } else {<br>@@ -654,6 +696,15 @@<br> {<br>    if (!sub_tree->persistence) {<br>              return;<br>+      }<br>+<br>+ if (sub_tree->persistence->prune_on_restart && sub_tree->transport) {<br>+               ast_debug(3, "Unregistering transport monitor on %s '%s->%s'\n",<br>+                        sub_tree->transport->obj_name,<br>+                 sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",<br>+                      sub_tree->root ? sub_tree->root->resource : "Unknown");<br>+           ast_sip_transport_monitor_unregister(sub_tree->transport,<br>+                 AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, sub_tree, NULL);<br>       }<br> <br>  ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);<br>@@ -1563,6 +1614,14 @@<br>  struct ast_taskprocessor *serializer;<br>         pjsip_rx_data rdata;<br>  struct persistence_recreate_data recreate_data;<br>+<br>+   /* If this subscription used a reliable transport it can't be reestablished so remove it */<br>+      if (persistence->prune_on_restart) {<br>+              ast_debug(3, "Deleting subscription marked as 'prune' from persistent store '%s' %s\n",<br>+                    persistence->endpoint, persistence->tag);<br>+              ast_sorcery_delete(ast_sip_get_sorcery(), persistence);<br>+              return 0;<br>+    }<br> <br>  /* If this subscription has already expired remove it */<br>      if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {<br>@@ -2924,6 +2983,7 @@<br>             ind->expires = -1;<br> <br>              sub_tree->persistence = subscription_persistence_create(sub_tree);<br>+<br>              subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);<br>           sip_subscription_accept(sub_tree, rdata, resp);<br>               if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {<br>@@ -5423,6 +5483,8 @@<br>            persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);<br>  ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0,<br>          CHARFLDSET(struct subscription_persistence, contact_uri));<br>+   ast_sorcery_object_field_register(sorcery, "subscription_persistence", "prune_on_restart", "0", OPT_UINT_T, 0,<br>+         FLDSET(struct subscription_persistence, prune_on_restart));<br> <br>        if (apply_list_configuration(sorcery)) {<br>              ast_sched_context_destroy(sched);<br>@@ -5499,6 +5561,8 @@<br>      AST_TEST_UNREGISTER(loop);<br>    AST_TEST_UNREGISTER(bad_event);<br> <br>+   ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_SUBSCRIPTION, NULL, NULL);<br>+<br>       ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));<br> <br>        ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);<br>diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c<br>index f0da6de..d53b256 100644<br>--- a/res/res_pjsip_registrar.c<br>+++ b/res/res_pjsip_registrar.c<br>@@ -327,6 +327,15 @@<br>   char aor_name[0];<br> };<br> <br>+static int contact_transport_monitor_matcher(void *a, void *b)<br>+{<br>+       struct contact_transport_monitor *ma = a;<br>+    struct contact_transport_monitor *mb = b;<br>+<br>+ return strcmp(ma->aor_name, mb->aor_name) == 0<br>+         && strcmp(ma->contact_name, mb->contact_name) == 0;<br>+}<br>+<br> static void register_contact_transport_shutdown_cb(void *data)<br> {<br>         struct contact_transport_monitor *monitor = data;<br>@@ -579,7 +588,6 @@<br>                contact = ao2_callback(contacts, OBJ_UNLINK, registrar_find_contact, &details);<br>           if (!contact) {<br>                       int prune_on_boot = 0;<br>-                       pj_str_t host_name;<br> <br>                        /* If they are actually trying to delete a contact that does not exist... be forgiving */<br>                     if (!expiration) {<br>@@ -588,35 +596,7 @@<br>                              continue;<br>                     }<br> <br>-                 /* Determine if the contact cannot survive a restart/boot. */<br>-                        if (details.uri->port == rdata->pkt_info.src_port<br>-                              && !pj_strcmp(&details.uri->host,<br>-                                     pj_cstr(&host_name, rdata->pkt_info.src_name))<br>-                                /* We have already checked if the URI scheme is sip: or sips: */<br>-                             && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {<br>-                               pj_str_t type_name;<br>-<br>-                               /* Determine the transport parameter value */<br>-                                if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {<br>-                                       /* WSS is special, as it needs to be ws. */<br>-                                  pj_cstr(&type_name, "ws");<br>-                             } else {<br>-                                     pj_cstr(&type_name, rdata->tp_info.transport->type_name);<br>-                          }<br>-<br>-                         if (!pj_stricmp(&details.uri->transport_param, &type_name)<br>-                                        && (endpoint->nat.rewrite_contact<br>-                                         /* Websockets are always rewritten */<br>-                                                || !pj_stricmp(&details.uri->transport_param,<br>-                                                 pj_cstr(&type_name, "ws")))) {<br>-                                 /*<br>-                                    * The contact was rewritten to the reliable transport's<br>-                                  * source address.  Disconnecting the transport for any<br>-                                       * reason invalidates the contact.<br>-                                    */<br>-                                  prune_on_boot = 1;<br>-                           }<br>-                    }<br>+                    prune_on_boot = !ast_sip_will_uri_survive_restart(details.uri, rdata);<br> <br>                     contact = ast_sip_location_create_contact(aor, contact_uri,<br>                           ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)),<br>@@ -645,6 +625,7 @@<br>                                      strcpy(monitor->contact_name, contact_name);/* Safe */<br> <br>                                  ast_sip_transport_monitor_register(rdata->tp_info.transport,<br>+                                              AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br>                                                register_contact_transport_shutdown_cb, monitor);<br>                                     ao2_ref(monitor, -1);<br>                                 }<br>@@ -703,6 +684,21 @@<br>                                       contact_update->user_agent);<br>                       ao2_cleanup(contact_update);<br>          } else {<br>+                     if (contact->prune_on_boot) {<br>+                             struct contact_transport_monitor *monitor;<br>+                           const char *contact_name =<br>+                                   ast_sorcery_object_get_id(contact);<br>+<br>+                               monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(aor_name)<br>+                                 + strlen(contact_name));<br>+                             strcpy(monitor->aor_name, aor_name);/* Safe */<br>+                            monitor->contact_name = monitor->aor_name + strlen(aor_name) + 1;<br>+                              strcpy(monitor->contact_name, contact_name);/* Safe */<br>+<br>+                         ast_sip_transport_monitor_unregister(rdata->tp_info.transport,<br>+                                    AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, monitor, contact_transport_monitor_matcher);<br>+                  }<br>+<br>                  /* We want to report the user agent that was actually in the removed contact */<br>                       ast_sip_location_delete_contact(contact);<br>                     ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name);<br>@@ -1125,7 +1121,7 @@<br>   ast_manager_unregister(AMI_SHOW_REGISTRATIONS);<br>       ast_manager_unregister(AMI_SHOW_REGISTRATION_CONTACT_STATUSES);<br>       ast_sip_unregister_service(&registrar_module);<br>-   ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb);<br>+    ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION, NULL, NULL);<br>  return 0;<br> }<br> <br>diff --git a/res/res_pjsip_registrar_expire.c b/res/res_pjsip_registrar_expire.c<br>index fe4a60d..61a9071 100644<br>--- a/res/res_pjsip_registrar_expire.c<br>+++ b/res/res_pjsip_registrar_expire.c<br>@@ -32,6 +32,27 @@<br> #include "asterisk/module.h"<br> #include "asterisk/named_locks.h"<br> <br>+//  These are common with res_pjsip_registrar!<br>+/*! Transport monitor for incoming REGISTER contacts */<br>+struct contact_transport_monitor {<br>+      /*!<br>+   * \brief Sorcery contact name to remove on transport shutdown<br>+        * \note Stored after aor_name in space reserved when struct allocated.<br>+       */<br>+  char *contact_name;<br>+  /*! AOR name the contact is associated */<br>+    char aor_name[0];<br>+};<br>+<br>+static int contact_transport_monitor_matcher(void *a, void *b)<br>+{<br>+       struct contact_transport_monitor *ma = a;<br>+    struct contact_transport_monitor *mb = b;<br>+<br>+ return strcmp(ma->aor_name, mb->aor_name) == 0<br>+         && strcmp(ma->contact_name, mb->contact_name) == 0;<br>+}<br>+<br> /*! \brief Thread keeping things alive */<br> static pthread_t check_thread = AST_PTHREADT_NULL;<br> <br>@@ -55,6 +76,21 @@<br>       */<br>   ao2_lock(lock);<br>       if (ast_tvdiff_ms(ast_tvnow(), contact->expiration_time) > 0) {<br>+                if (contact->prune_on_boot) {<br>+                     struct contact_transport_monitor *monitor;<br>+                   const char *contact_name =<br>+                           ast_sorcery_object_get_id(contact);<br>+<br>+                       monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(contact->aor)<br>+                          + strlen(contact_name));<br>+                     strcpy(monitor->aor_name, contact->aor);/* Safe */<br>+                     monitor->contact_name = monitor->aor_name + strlen(contact->aor) + 1;<br>+                       strcpy(monitor->contact_name, contact_name);/* Safe */<br>+<br>+                 // We don't have a pointer to transport here so we remove from all transports.<br>+                   ast_sip_transport_monitor_unregister_all(AST_TRANSPORT_MONITOR_ID_INBOUND_REGISTRATION,<br>+                              monitor, contact_transport_monitor_matcher);<br>+         }<br>             ast_sip_location_delete_contact(contact);<br>     }<br>     ao2_unlock(lock);<br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/8083">change 8083</a>. To unsubscribe, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/8083"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 15 </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36 </div>
<div style="display:none"> Gerrit-Change-Number: 8083 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: George Joseph <gjoseph@digium.com> </div>