<p>Joshua Colp has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.asterisk.org/10479">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">stasis: Add internal filtering of messages.<br><br>This change adds the ability for subscriptions to indicate<br>which message types they are interested in accepting. By<br>doing so the filtering is done before being dispatched<br>to the subscriber, reducing the amount of work that has<br>to be done.<br><br>This is optional and if a subscriber does not add<br>message types they wish to accept the previous behavior<br>is preserved and they receive all messages.<br><br>There is also the ability to explicitly force the reception<br>of all messages for cases such as AMI or ARI where a large<br>number of messages are expected that are then generically<br>converted into a different format.<br><br>ASTERISK-28103<br><br>Change-Id: I99bee23895baa0a117985d51683f7963b77aa190<br>---<br>M apps/app_queue.c<br>M channels/chan_dahdi.c<br>M channels/chan_iax2.c<br>M channels/chan_mgcp.c<br>M channels/chan_sip.c<br>M channels/chan_skinny.c<br>M channels/sig_pri.c<br>M include/asterisk/stasis.h<br>M include/asterisk/stasis_cache_pattern.h<br>M include/asterisk/stasis_message_router.h<br>M main/ccss.c<br>M main/devicestate.c<br>M main/endpoints.c<br>M main/manager.c<br>M main/pbx.c<br>M main/presencestate.c<br>M main/stasis.c<br>M main/stasis_cache.c<br>M main/stasis_cache_pattern.c<br>M main/stasis_message.c<br>M main/stasis_message_router.c<br>M res/parking/parking_applications.c<br>M res/parking/parking_bridge_features.c<br>M res/parking/parking_manager.c<br>M res/res_hep_rtcp.c<br>M res/res_pjsip_mwi.c<br>M res/res_pjsip_outbound_registration.c<br>M res/res_pjsip_publish_asterisk.c<br>M res/res_pjsip_refer.c<br>M res/res_security_log.c<br>M res/res_stasis_device_state.c<br>M res/res_xmpp.c<br>32 files changed, 303 insertions(+), 12 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/79/10479/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/apps/app_queue.c b/apps/app_queue.c</span><br><span>index 3dc735a..0a0d70d 100644</span><br><span>--- a/apps/app_queue.c</span><br><span>+++ b/apps/app_queue.c</span><br><span>@@ -11334,6 +11334,7 @@</span><br><span>      if (!device_state_sub) {</span><br><span>             err = -1;</span><br><span>    }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span> </span><br><span>      manager_topic = ast_manager_get_topic();</span><br><span>     queue_topic = ast_queue_topic_all();</span><br><span>diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c</span><br><span>index f4f6514..4a12404 100644</span><br><span>--- a/channels/chan_dahdi.c</span><br><span>+++ b/channels/chan_dahdi.c</span><br><span>@@ -12594,6 +12594,9 @@</span><br><span>                               * knows that we care about it.  Then, chan_dahdi will get the MWI from the</span><br><span>                           * event cache instead of checking the mailbox directly. */</span><br><span>                          tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+                             if (tmp->mwi_event_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                                  stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                         }</span><br><span>                    }</span><br><span>            }</span><br><span> #ifdef HAVE_DAHDI_LINEREVERSE_VMWI</span><br><span>diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c</span><br><span>index 01d42b5..a7c7d18 100644</span><br><span>--- a/channels/chan_iax2.c</span><br><span>+++ b/channels/chan_iax2.c</span><br><span>@@ -1456,6 +1456,9 @@</span><br><span>         if (!network_change_sub) {</span><br><span>           network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span>                    network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+              if (network_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                     stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+               }</span><br><span>    }</span><br><span> }</span><br><span> </span><br><span>@@ -1469,6 +1472,9 @@</span><br><span>   if (!acl_change_sub) {</span><br><span>               acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span>                      acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+          if (acl_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                 stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+         }</span><br><span>    }</span><br><span> }</span><br><span> </span><br><span>@@ -13072,6 +13078,9 @@</span><br><span>                          * mailboxes.  However, we just grab the events out of the cache when it</span><br><span>                      * is time to send MWI, since it is only sent with a REGACK. */</span><br><span>                      peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+                    if (peer->mwi_event_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                         stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                        }</span><br><span>            }</span><br><span>    }</span><br><span> </span><br><span>diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c</span><br><span>index 2ac7690..5eee6e9 100644</span><br><span>--- a/channels/chan_mgcp.c</span><br><span>+++ b/channels/chan_mgcp.c</span><br><span>@@ -4242,6 +4242,9 @@</span><br><span>                                            * knows that we care about it.  Then, chan_mgcp will get the MWI from the</span><br><span>                                            * event cache instead of checking the mailbox directly. */</span><br><span>                                          e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+                                               if (e->mwi_event_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                                                    stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                                           }</span><br><span>                                    }</span><br><span>                            }</span><br><span>                            snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());</span><br><span>diff --git a/channels/chan_sip.c b/channels/chan_sip.c</span><br><span>index 42362b8..d6186cc 100644</span><br><span>--- a/channels/chan_sip.c</span><br><span>+++ b/channels/chan_sip.c</span><br><span>@@ -17494,6 +17494,9 @@</span><br><span>     if (!network_change_sub) {</span><br><span>           network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span>                    network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+              if (network_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                     stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+               }</span><br><span>    }</span><br><span> }</span><br><span> </span><br><span>@@ -17507,6 +17510,9 @@</span><br><span>         if (!acl_change_sub) {</span><br><span>               acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span>                      acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+          if (acl_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                 stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+         }</span><br><span>    }</span><br><span> </span><br><span> }</span><br><span>@@ -28379,6 +28385,10 @@</span><br><span>                mailbox_specific_topic = ast_mwi_topic(mailbox->id);</span><br><span>              if (mailbox_specific_topic) {</span><br><span>                        mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);</span><br><span style="color: hsl(120, 100%, 40%);">+                    if (mailbox->event_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                          stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                         stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                    }</span><br><span>            }</span><br><span>    }</span><br><span> }</span><br><span>diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c</span><br><span>index 2b13e5e..21671e2 100644</span><br><span>--- a/channels/chan_skinny.c</span><br><span>+++ b/channels/chan_skinny.c</span><br><span>@@ -8334,6 +8334,9 @@</span><br><span>          mailbox_specific_topic = ast_mwi_topic(l->mailbox);</span><br><span>               if (mailbox_specific_topic) {</span><br><span>                        l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);</span><br><span style="color: hsl(120, 100%, 40%);">+                 if (l->mwi_event_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                            stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+                   }</span><br><span>            }</span><br><span>    }</span><br><span> </span><br><span>diff --git a/channels/sig_pri.c b/channels/sig_pri.c</span><br><span>index fbc4e40..e968602 100644</span><br><span>--- a/channels/sig_pri.c</span><br><span>+++ b/channels/sig_pri.c</span><br><span>@@ -9130,6 +9130,8 @@</span><br><span>           if (!pri->mbox[i].sub) {</span><br><span>                  ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",</span><br><span>                           sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);</span><br><span style="color: hsl(120, 100%, 40%);">+         } else {</span><br><span style="color: hsl(120, 100%, 40%);">+                      stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());</span><br><span>          }</span><br><span> #if defined(HAVE_PRI_MWI_V2)</span><br><span>            if (ast_strlen_zero(pri->mbox[i].vm_number)) {</span><br><span>diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h</span><br><span>index 8329dd0..650a296 100644</span><br><span>--- a/include/asterisk/stasis.h</span><br><span>+++ b/include/asterisk/stasis.h</span><br><span>@@ -327,6 +327,14 @@</span><br><span> unsigned int stasis_message_type_hash(const struct stasis_message_type *type);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Gets the id of a given message type</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The type to get the id of.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return The id</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Check whether a message type is declined</span><br><span>  *</span><br><span>  * \param name The name of the message type to check</span><br><span>@@ -490,6 +498,14 @@</span><br><span> const char *stasis_topic_name(const struct stasis_topic *topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Return the number of subscribers of a topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param topic Topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return Number of subscribers of the topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Publish a message to a topic's subscribers.</span><br><span>  * \param topic Topic.</span><br><span>  * \param message Message to publish.</span><br><span>@@ -554,6 +570,10 @@</span><br><span>  * \return New \ref stasis_subscription object.</span><br><span>  * \return \c NULL on error.</span><br><span>  * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span>  */</span><br><span> struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,</span><br><span>   stasis_subscription_cb callback, void *data);</span><br><span>@@ -579,11 +599,47 @@</span><br><span>  * \return New \ref stasis_subscription object.</span><br><span>  * \return \c NULL on error.</span><br><span>  * \since 12.8.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span>  */</span><br><span> struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,</span><br><span>      stasis_subscription_cb callback, void *data);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to allow the given message type to be</span><br><span style="color: hsl(120, 100%, 40%);">+ * raised to our subscription callback. This enables internal filtering in</span><br><span style="color: hsl(120, 100%, 40%);">+ * the stasis message bus to reduce messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription to add message type to.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note If you are wanting to use stasis_final_message you will need to accept</span><br><span style="color: hsl(120, 100%, 40%);">+ * \ref stasis_subscription_change_type as a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+   struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are interested in ALL messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to raise all messages to our subscription</span><br><span style="color: hsl(120, 100%, 40%);">+ * callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription that should receive all messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+void stasis_subscription_accept_all(struct stasis_subscription *subscription);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Cancel a subscription.</span><br><span>  *</span><br><span>  * Note that in an asynchronous system, there may still be messages queued or</span><br><span>@@ -1032,6 +1088,23 @@</span><br><span>    struct stasis_caching_topic *caching_topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a caching topic that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param caching_topic The caching topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+   struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief A message which instructs the caching topic to remove an entry from</span><br><span>  * its cache.</span><br><span>  *</span><br><span>@@ -1221,6 +1294,12 @@</span><br><span> void stasis_log_bad_type_access(const char *name);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief The maximum number of Stasis message types that can be registered.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+#define STASIS_MESSAGE_TYPES_MAXIMUM 256</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span>  * \brief Boiler-plate messaging macro for defining public message types.</span><br><span>  *</span><br><span>  * \code</span><br><span>diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h</span><br><span>index e61d3e9..c0626f8 100644</span><br><span>--- a/include/asterisk/stasis_cache_pattern.h</span><br><span>+++ b/include/asterisk/stasis_cache_pattern.h</span><br><span>@@ -169,4 +169,21 @@</span><br><span> struct stasis_topic *stasis_cp_single_topic_cached(</span><br><span>      struct stasis_cp_single *one);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to an instance that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param one One side of the cache pattern.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */</span><br><span>diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h</span><br><span>index 50270a7..8dcdfcc 100644</span><br><span>--- a/include/asterisk/stasis_message_router.h</span><br><span>+++ b/include/asterisk/stasis_message_router.h</span><br><span>@@ -233,6 +233,10 @@</span><br><span>  * \retval -1 on failure</span><br><span>  *</span><br><span>  * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note Setting a default callback will automatically cause the underlying</span><br><span style="color: hsl(120, 100%, 40%);">+ * subscription to receive all messages and not be filtered. If filtering is</span><br><span style="color: hsl(120, 100%, 40%);">+ * desired then a specific route for each message type should be provided.</span><br><span>  */</span><br><span> int stasis_message_router_set_default(struct stasis_message_router *router,</span><br><span>                                       stasis_subscription_cb callback,</span><br><span>diff --git a/main/ccss.c b/main/ccss.c</span><br><span>index 9cf16e3..03db6bb 100644</span><br><span>--- a/main/ccss.c</span><br><span>+++ b/main/ccss.c</span><br><span>@@ -1433,6 +1433,7 @@</span><br><span>              cc_unref(generic_list, "Failed to subscribe to device state");</span><br><span>             return NULL;</span><br><span>         }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());</span><br><span>      generic_list->current_state = ast_device_state(monitor->interface->device_name);</span><br><span>    ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");</span><br><span>   return generic_list;</span><br><span>@@ -2804,6 +2805,8 @@</span><br><span>         if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {</span><br><span>            return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+        stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());</span><br><span>     cc_ref(agent, "Ref agent for subscription");</span><br><span>       return 0;</span><br><span> }</span><br><span>diff --git a/main/devicestate.c b/main/devicestate.c</span><br><span>index 7dcbe82..86fe372 100644</span><br><span>--- a/main/devicestate.c</span><br><span>+++ b/main/devicestate.c</span><br><span>@@ -920,6 +920,7 @@</span><br><span>    if (!device_state_topic_cached) {</span><br><span>            return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());</span><br><span> </span><br><span>  devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),</span><br><span>                devstate_change_cb, NULL);</span><br><span>@@ -927,6 +928,7 @@</span><br><span>             ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");</span><br><span>            return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());</span><br><span> </span><br><span>  return 0;</span><br><span> }</span><br><span>diff --git a/main/endpoints.c b/main/endpoints.c</span><br><span>index 992da1f..ad45053 100644</span><br><span>--- a/main/endpoints.c</span><br><span>+++ b/main/endpoints.c</span><br><span>@@ -200,7 +200,7 @@</span><br><span>    endpoint_publish_snapshot(endpoint);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-static void endpoint_default(void *data,</span><br><span style="color: hsl(120, 100%, 40%);">+static void endpoint_subscription_change(void *data,</span><br><span>  struct stasis_subscription *sub,</span><br><span>     struct stasis_message *message)</span><br><span> {</span><br><span>@@ -261,6 +261,7 @@</span><br><span>           if (!endpoint->topics) {</span><br><span>                  return NULL;</span><br><span>                 }</span><br><span style="color: hsl(120, 100%, 40%);">+             stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span> </span><br><span>                 endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));</span><br><span>               if (!endpoint->router) {</span><br><span>@@ -269,8 +270,9 @@</span><br><span>            r |= stasis_message_router_add(endpoint->router,</span><br><span>                  ast_channel_snapshot_type(), endpoint_cache_clear,</span><br><span>                   endpoint);</span><br><span style="color: hsl(0, 100%, 40%);">-              r |= stasis_message_router_set_default(endpoint->router,</span><br><span style="color: hsl(0, 100%, 40%);">-                     endpoint_default, endpoint);</span><br><span style="color: hsl(120, 100%, 40%);">+          r |= stasis_message_router_add(endpoint->router,</span><br><span style="color: hsl(120, 100%, 40%);">+                   stasis_subscription_change_type(), endpoint_subscription_change,</span><br><span style="color: hsl(120, 100%, 40%);">+                      endpoint);</span><br><span>           if (r) {</span><br><span>                     return NULL;</span><br><span>                 }</span><br><span>@@ -286,6 +288,7 @@</span><br><span>              if (!endpoint->topics) {</span><br><span>                  return NULL;</span><br><span>                 }</span><br><span style="color: hsl(120, 100%, 40%);">+             stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span> </span><br><span>                 ao2_link(tech_endpoints, endpoint);</span><br><span>  }</span><br><span>diff --git a/main/manager.c b/main/manager.c</span><br><span>index 9d67e0c..6e6d9d8 100644</span><br><span>--- a/main/manager.c</span><br><span>+++ b/main/manager.c</span><br><span>@@ -1527,6 +1527,9 @@</span><br><span>       if (!acl_change_sub) {</span><br><span>               acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span>                      acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+          if (acl_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                 stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+         }</span><br><span>    }</span><br><span> }</span><br><span> </span><br><span>diff --git a/main/pbx.c b/main/pbx.c</span><br><span>index 727018b..9718133 100644</span><br><span>--- a/main/pbx.c</span><br><span>+++ b/main/pbx.c</span><br><span>@@ -8416,10 +8416,13 @@</span><br><span>    if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {</span><br><span>           return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+   stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());</span><br><span> </span><br><span>   if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {</span><br><span>             return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());</span><br><span> </span><br><span>  return 0;</span><br><span> }</span><br><span>diff --git a/main/presencestate.c b/main/presencestate.c</span><br><span>index 4121bf5..e287748 100644</span><br><span>--- a/main/presencestate.c</span><br><span>+++ b/main/presencestate.c</span><br><span>@@ -514,6 +514,7 @@</span><br><span>    if (!presence_state_topic_cached) {</span><br><span>          return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());</span><br><span> </span><br><span>      AST_TEST_REGISTER(test_presence_chan);</span><br><span> </span><br><span>diff --git a/main/stasis.c b/main/stasis.c</span><br><span>index 51f01c0..8815ef3 100644</span><br><span>--- a/main/stasis.c</span><br><span>+++ b/main/stasis.c</span><br><span>@@ -370,6 +370,18 @@</span><br><span>   return topic->name;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_VECTOR_SIZE(&topic->subscribers);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*! \internal */</span><br><span style="color: hsl(120, 100%, 40%);">+enum stasis_all_messages {</span><br><span style="color: hsl(120, 100%, 40%);">+        STASIS_ALL_MESSAGES_DISABLED = 0, /*! The subscription is selective on message types */</span><br><span style="color: hsl(120, 100%, 40%);">+       STASIS_ALL_MESSAGES_DEFAULT,      /*! The subscription is by default accepting all */</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_ALL_MESSAGES_FORCED,       /*! The subscription has forced all messages */</span><br><span style="color: hsl(120, 100%, 40%);">+};</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_subscription {</span><br><span>  /*! Unique ID for this subscription */</span><br><span>@@ -391,6 +403,11 @@</span><br><span>        /*! Flag set when final message for sub has been processed.</span><br><span>   *  Be sure join_lock is held before reading/setting. */</span><br><span>     int final_message_processed;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        /*! The message types this subscription is accepting */</span><br><span style="color: hsl(120, 100%, 40%);">+       char accepted_message_types[STASIS_MESSAGE_TYPES_MAXIMUM];</span><br><span style="color: hsl(120, 100%, 40%);">+    /*! Whether all messages are being accepted by the subscription */</span><br><span style="color: hsl(120, 100%, 40%);">+    enum stasis_all_messages all_messages;</span><br><span> };</span><br><span> </span><br><span> static void subscription_dtor(void *obj)</span><br><span>@@ -420,19 +437,24 @@</span><br><span> static void subscription_invoke(struct stasis_subscription *sub,</span><br><span>                               struct stasis_message *message)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+        unsigned int final = stasis_subscription_final_message(sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ int message_type_id = stasis_message_type_id(stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>   /* Notify that the final message has been received */</span><br><span style="color: hsl(0, 100%, 40%);">-   if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+        if (final) {</span><br><span>                 ao2_lock(sub);</span><br><span>               sub->final_message_rxed = 1;</span><br><span>              ast_cond_signal(&sub->join_cond);</span><br><span>             ao2_unlock(sub);</span><br><span>     }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(0, 100%, 40%);">-        sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!final || sub->all_messages != STASIS_ALL_MESSAGES_DISABLED || sub->accepted_message_types[message_type_id]) {</span><br><span style="color: hsl(120, 100%, 40%);">+              /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(120, 100%, 40%);">+              sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> </span><br><span>        /* Notify that the final message has been processed */</span><br><span style="color: hsl(0, 100%, 40%);">-  if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+        if (final) {</span><br><span>                 ao2_lock(sub);</span><br><span>               sub->final_message_processed = 1;</span><br><span>                 ast_cond_signal(&sub->join_cond);</span><br><span>@@ -500,6 +522,7 @@</span><br><span>       sub->callback = callback;</span><br><span>         sub->data = data;</span><br><span>         ast_cond_init(&sub->join_cond, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+  sub->all_messages = STASIS_ALL_MESSAGES_DEFAULT;</span><br><span> </span><br><span>      if (topic_add_subscription(topic, sub) != 0) {</span><br><span>               ao2_ref(sub, -1);</span><br><span>@@ -583,6 +606,38 @@</span><br><span>     return res;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+   struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+  if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+    }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+     subscription->accepted_message_types[stasis_message_type_id(type)] = 1;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+  /* If the subscription is still in the default accept all switch it to</span><br><span style="color: hsl(120, 100%, 40%);">+         * selective.</span><br><span style="color: hsl(120, 100%, 40%);">+  */</span><br><span style="color: hsl(120, 100%, 40%);">+   if (subscription->all_messages == STASIS_ALL_MESSAGES_DEFAULT) {</span><br><span style="color: hsl(120, 100%, 40%);">+           subscription->all_messages = STASIS_ALL_MESSAGES_DISABLED;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+     ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+void stasis_subscription_accept_all(struct stasis_subscription *subscription)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+      if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+          return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+     subscription->all_messages = STASIS_ALL_MESSAGES_FORCED;</span><br><span style="color: hsl(120, 100%, 40%);">+   ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> void stasis_subscription_join(struct stasis_subscription *subscription)</span><br><span> {</span><br><span>     if (subscription) {</span><br><span>@@ -778,6 +833,15 @@</span><br><span>   struct stasis_message *message,</span><br><span>      int synchronous)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Determine if this subscription is interested in this message. Note that final</span><br><span style="color: hsl(120, 100%, 40%);">+       * messages are special and are always invoked on the subscription.</span><br><span style="color: hsl(120, 100%, 40%);">+    */</span><br><span style="color: hsl(120, 100%, 40%);">+   if (sub->all_messages == STASIS_ALL_MESSAGES_DISABLED &&</span><br><span style="color: hsl(120, 100%, 40%);">+           !sub->accepted_message_types[stasis_message_type_id(stasis_message_type(message))] &&</span><br><span style="color: hsl(120, 100%, 40%);">+              !stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+           return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  if (!sub->mailbox) {</span><br><span>              /* Dispatch directly */</span><br><span>              subscription_invoke(sub, message);</span><br><span>@@ -837,6 +901,11 @@</span><br><span>    ast_assert(topic != NULL);</span><br><span>   ast_assert(message != NULL);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+      /* If there are no subscribers don't bother */</span><br><span style="color: hsl(120, 100%, 40%);">+    if (!stasis_topic_subscribers(topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+               return;</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  /*</span><br><span>    * The topic may be unref'ed by the subscription invocation.</span><br><span>      * Make sure we hold onto a reference while dispatching.</span><br><span>diff --git a/main/stasis_cache.c b/main/stasis_cache.c</span><br><span>index 3d353b3..d03758d 100644</span><br><span>--- a/main/stasis_cache.c</span><br><span>+++ b/main/stasis_cache.c</span><br><span>@@ -87,6 +87,20 @@</span><br><span>       return caching_topic->topic;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+  struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+  /* We wait to accept the stasis specific message types until now so that by default everything</span><br><span style="color: hsl(120, 100%, 40%);">+         * will flow to us.</span><br><span style="color: hsl(120, 100%, 40%);">+    */</span><br><span style="color: hsl(120, 100%, 40%);">+   if (stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type()) ||</span><br><span style="color: hsl(120, 100%, 40%);">+              stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type()) ||</span><br><span style="color: hsl(120, 100%, 40%);">+          stasis_subscription_accept_message_type(caching_topic->sub, type)) {</span><br><span style="color: hsl(120, 100%, 40%);">+               return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+    }</span><br><span style="color: hsl(120, 100%, 40%);">+     return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)</span><br><span> {</span><br><span>   if (!caching_topic) {</span><br><span>@@ -856,11 +870,13 @@</span><br><span>                /* Update the cache */</span><br><span>               snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);</span><br><span>          if (snapshots.old || msg_put) {</span><br><span style="color: hsl(0, 100%, 40%);">-                 update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(0, 100%, 40%);">-                 if (update) {</span><br><span style="color: hsl(0, 100%, 40%);">-                           stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+                      if (stasis_topic_subscribers(caching_topic->topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+                              update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(120, 100%, 40%);">+                               if (update) {</span><br><span style="color: hsl(120, 100%, 40%);">+                                 stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+                                      ao2_ref(update, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+                          }</span><br><span>                    }</span><br><span style="color: hsl(0, 100%, 40%);">-                       ao2_cleanup(update);</span><br><span>                 } else {</span><br><span>                     ast_debug(1,</span><br><span>                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",</span><br><span>@@ -868,7 +884,7 @@</span><br><span>                              stasis_message_type_name(msg_type), msg_id);</span><br><span>                 }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-           if (snapshots.aggregate_old != snapshots.aggregate_new) {</span><br><span style="color: hsl(120, 100%, 40%);">+             if (stasis_topic_subscribers(caching_topic->topic) && snapshots.aggregate_old != snapshots.aggregate_new) {</span><br><span>                       if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {</span><br><span>                           caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,</span><br><span>                                   snapshots.aggregate_new);</span><br><span>diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c</span><br><span>index f0e34b9..05d95a5 100644</span><br><span>--- a/main/stasis_cache_pattern.c</span><br><span>+++ b/main/stasis_cache_pattern.c</span><br><span>@@ -217,3 +217,12 @@</span><br><span>    }</span><br><span>    return stasis_caching_get_topic(one->topic_cached);</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+  if (!one) {</span><br><span style="color: hsl(120, 100%, 40%);">+           return NULL;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+     return stasis_caching_accept_message_type(one->topic_cached, type);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span>diff --git a/main/stasis_message.c b/main/stasis_message.c</span><br><span>index 19f4a92..50dbae1 100644</span><br><span>--- a/main/stasis_message.c</span><br><span>+++ b/main/stasis_message.c</span><br><span>@@ -39,9 +39,11 @@</span><br><span>      struct stasis_message_vtable *vtable;</span><br><span>        char *name;</span><br><span>  unsigned int hash;</span><br><span style="color: hsl(120, 100%, 40%);">+    int id;</span><br><span> };</span><br><span> </span><br><span> static struct stasis_message_vtable null_vtable = {};</span><br><span style="color: hsl(120, 100%, 40%);">+static int message_type_id;</span><br><span> </span><br><span> static void message_type_dtor(void *obj)</span><br><span> {</span><br><span>@@ -61,6 +63,12 @@</span><br><span>            return STASIS_MESSAGE_TYPE_DECLINED;</span><br><span>         }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* If there is no more space for message type registration then decline this type */</span><br><span style="color: hsl(120, 100%, 40%);">+  if (message_type_id == STASIS_MESSAGE_TYPES_MAXIMUM) {</span><br><span style="color: hsl(120, 100%, 40%);">+                ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to maximum number of registered message types being reached\n", name);</span><br><span style="color: hsl(120, 100%, 40%);">+              return STASIS_MESSAGE_TYPE_DECLINED;</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  type = ao2_t_alloc_options(sizeof(*type), message_type_dtor,</span><br><span>                 AO2_ALLOC_OPT_LOCK_NOLOCK, name ?: "");</span><br><span>    if (!type) {</span><br><span>@@ -78,6 +86,7 @@</span><br><span>     }</span><br><span>    type->hash = ast_hashtab_hash_string(name);</span><br><span>       type->vtable = vtable;</span><br><span style="color: hsl(120, 100%, 40%);">+     type->id = ast_atomic_fetchadd_int(&message_type_id, +1);</span><br><span>     *result = type;</span><br><span> </span><br><span>  return STASIS_MESSAGE_TYPE_SUCCESS;</span><br><span>@@ -93,6 +102,11 @@</span><br><span>    return type->hash;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return type->id;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_message {</span><br><span>      /*! Time the message was created */</span><br><span>diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c</span><br><span>index 41d426b..7e04d46 100644</span><br><span>--- a/main/stasis_message_router.c</span><br><span>+++ b/main/stasis_message_router.c</span><br><span>@@ -235,6 +235,9 @@</span><br><span>               return NULL;</span><br><span>         }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* We need to receive subscription change messages so we know when our subscription goes away */</span><br><span style="color: hsl(120, 100%, 40%);">+      stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>       return router;</span><br><span> }</span><br><span> </span><br><span>@@ -316,6 +319,9 @@</span><br><span>        }</span><br><span>    ao2_lock(router);</span><br><span>    res = route_table_add(&router->routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+  if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+           stasis_subscription_accept_message_type(router->subscription, message_type);</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span>    ao2_unlock(router);</span><br><span>  return res;</span><br><span> }</span><br><span>@@ -334,6 +340,9 @@</span><br><span>       }</span><br><span>    ao2_lock(router);</span><br><span>    res = route_table_add(&router->cache_routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+    if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+           stasis_subscription_accept_message_type(router->subscription, message_type);</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span>    ao2_unlock(router);</span><br><span>  return res;</span><br><span> }</span><br><span>@@ -378,6 +387,9 @@</span><br><span>       router->default_route.callback = callback;</span><br><span>        router->default_route.data = data;</span><br><span>        ao2_unlock(router);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_all(router->subscription);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>   /* While this implementation can never fail, it used to be able to */</span><br><span>        return 0;</span><br><span> }</span><br><span>diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c</span><br><span>index dd2fb75..3ecfc4e 100644</span><br><span>--- a/res/parking/parking_applications.c</span><br><span>+++ b/res/parking/parking_applications.c</span><br><span>@@ -954,6 +954,9 @@</span><br><span>            return -1;</span><br><span>   }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+        stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  /* Now for the fun part... park it! */</span><br><span>       ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);</span><br><span> </span><br><span>diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c</span><br><span>index cbc23fa..d8252a7 100644</span><br><span>--- a/res/parking/parking_bridge_features.c</span><br><span>+++ b/res/parking/parking_bridge_features.c</span><br><span>@@ -213,6 +213,7 @@</span><br><span>         if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {</span><br><span>                 return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());</span><br><span> </span><br><span>       datastore->data = parked_datastore;</span><br><span> </span><br><span>diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c</span><br><span>index 6d0a4c0..764f39b 100644</span><br><span>--- a/res/parking/parking_manager.c</span><br><span>+++ b/res/parking/parking_manager.c</span><br><span>@@ -686,6 +686,9 @@</span><br><span> {</span><br><span>       if (!parking_sub) {</span><br><span>          parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+          if (parking_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+                    stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+         }</span><br><span>    }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c</span><br><span>index c3abbc1..85ad9b5 100644</span><br><span>--- a/res/res_hep_rtcp.c</span><br><span>+++ b/res/res_hep_rtcp.c</span><br><span>@@ -167,6 +167,8 @@</span><br><span>        if (!stasis_rtp_subscription) {</span><br><span>              return AST_MODULE_LOAD_DECLINE;</span><br><span>      }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());</span><br><span style="color: hsl(120, 100%, 40%);">+   stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());</span><br><span> </span><br><span>  return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c</span><br><span>index 4cd892c..a434f48 100644</span><br><span>--- a/res/res_pjsip_mwi.c</span><br><span>+++ b/res/res_pjsip_mwi.c</span><br><span>@@ -269,6 +269,8 @@</span><br><span>          ao2_ref(mwi_sub, -1);</span><br><span>                mwi_stasis_sub = NULL;</span><br><span>       }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());</span><br><span>   return mwi_stasis_sub;</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c</span><br><span>index 0d815ad..b38096c 100644</span><br><span>--- a/res/res_pjsip_outbound_registration.c</span><br><span>+++ b/res/res_pjsip_outbound_registration.c</span><br><span>@@ -2282,6 +2282,9 @@</span><br><span> </span><br><span>         network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span>            network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+      if (network_change_sub) {</span><br><span style="color: hsl(120, 100%, 40%);">+             stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+       }</span><br><span> </span><br><span>        return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c</span><br><span>index 220ba0b..841b827 100644</span><br><span>--- a/res/res_pjsip_publish_asterisk.c</span><br><span>+++ b/res/res_pjsip_publish_asterisk.c</span><br><span>@@ -360,6 +360,8 @@</span><br><span>              ao2_ref(datastore, -1);</span><br><span>              return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+      stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());</span><br><span> </span><br><span>       cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span>  ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);</span><br><span>@@ -435,6 +437,8 @@</span><br><span>               ao2_ref(datastore, -1);</span><br><span>              return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+        stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());</span><br><span> </span><br><span>      cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);</span><br><span>     ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);</span><br><span>diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c</span><br><span>index 1e6ca7f..5e9666a 100644</span><br><span>--- a/res/res_pjsip_refer.c</span><br><span>+++ b/res/res_pjsip_refer.c</span><br><span>@@ -686,6 +686,8 @@</span><br><span>                      ast_channel_unlock(chan);</span><br><span> </span><br><span>                        ao2_cleanup(refer->progress);</span><br><span style="color: hsl(120, 100%, 40%);">+              } else {</span><br><span style="color: hsl(120, 100%, 40%);">+                      stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());</span><br><span>               }</span><br><span>    }</span><br><span> </span><br><span>diff --git a/res/res_security_log.c b/res/res_security_log.c</span><br><span>index 555ba23..a50a8c8 100644</span><br><span>--- a/res/res_security_log.c</span><br><span>+++ b/res/res_security_log.c</span><br><span>@@ -141,6 +141,7 @@</span><br><span>             LOG_SECURITY = -1;</span><br><span>           return AST_MODULE_LOAD_DECLINE;</span><br><span>      }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());</span><br><span> </span><br><span>         ast_verb(3, "Security Logging Enabled\n");</span><br><span> </span><br><span>diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c</span><br><span>index be09b15..bbe3f27 100644</span><br><span>--- a/res/res_stasis_device_state.c</span><br><span>+++ b/res/res_stasis_device_state.c</span><br><span>@@ -394,6 +394,8 @@</span><br><span>              ao2_ref(sub, -1);</span><br><span>            return -1;</span><br><span>   }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+        stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());</span><br><span> </span><br><span>         ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);</span><br><span>         ao2_unlock(device_state_subscriptions);</span><br><span>diff --git a/res/res_xmpp.c b/res/res_xmpp.c</span><br><span>index b72581f..e51d7d2 100644</span><br><span>--- a/res/res_xmpp.c</span><br><span>+++ b/res/res_xmpp.c</span><br><span>@@ -1626,11 +1626,13 @@</span><br><span>       if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {</span><br><span>                return;</span><br><span>      }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());</span><br><span> </span><br><span>       if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {</span><br><span>              client->mwi_sub = stasis_unsubscribe(client->mwi_sub);</span><br><span>                 return;</span><br><span>      }</span><br><span style="color: hsl(120, 100%, 40%);">+     stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());</span><br><span> </span><br><span>   cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span>  ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/10479">change 10479</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/10479"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>
<div style="display:none"> Gerrit-Change-Id: I99bee23895baa0a117985d51683f7963b77aa190 </div>
<div style="display:none"> Gerrit-Change-Number: 10479 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: Joshua Colp <jcolp@digium.com> </div>