<p>Joshua Colp <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/10640">View Change</a></p><div style="white-space:pre-wrap">Approvals:
Corey Farrell: Looks good to me, but someone else must approve
George Joseph: Looks good to me, approved
Joshua Colp: Approved for Submit
</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">stasis: Add internal filtering of messages.<br><br>This change adds the ability for subscriptions to indicate<br>which message types they are interested in accepting. By<br>doing so the filtering is done before being dispatched<br>to the subscriber, reducing the amount of work that has<br>to be done.<br><br>This is optional and if a subscriber does not add<br>message types they wish to accept and set the subscription<br>to selective filtering the previous behavior is preserved<br>and they receive all messages.<br><br>There is also the ability to explicitly force the reception<br>of all messages for cases such as AMI or ARI where a large<br>number of messages are expected that are then generically<br>converted into a different format.<br><br>ASTERISK-28103<br><br>Change-Id: I99bee23895baa0a117985d51683f7963b77aa190<br>---<br>M apps/app_queue.c<br>M channels/chan_dahdi.c<br>M channels/chan_iax2.c<br>M channels/chan_mgcp.c<br>M channels/chan_sip.c<br>M channels/chan_skinny.c<br>M channels/sig_pri.c<br>M include/asterisk/stasis.h<br>M include/asterisk/stasis_cache_pattern.h<br>M include/asterisk/stasis_message_router.h<br>M main/ccss.c<br>M main/devicestate.c<br>M main/endpoints.c<br>M main/manager.c<br>M main/pbx.c<br>M main/presencestate.c<br>M main/stasis.c<br>M main/stasis_cache.c<br>M main/stasis_cache_pattern.c<br>M main/stasis_message.c<br>M main/stasis_message_router.c<br>M res/parking/parking_applications.c<br>M res/parking/parking_bridge_features.c<br>M res/parking/parking_manager.c<br>M res/res_hep_rtcp.c<br>M res/res_pjsip_mwi.c<br>M res/res_pjsip_outbound_registration.c<br>M res/res_pjsip_publish_asterisk.c<br>M res/res_pjsip_pubsub.c<br>M res/res_pjsip_refer.c<br>M res/res_security_log.c<br>M res/res_stasis_device_state.c<br>M res/res_xmpp.c<br>33 files changed, 457 insertions(+), 17 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/apps/app_queue.c b/apps/app_queue.c</span><br><span>index ddbd1bd..ce00b5e 100644</span><br><span>--- a/apps/app_queue.c</span><br><span>+++ b/apps/app_queue.c</span><br><span>@@ -11007,6 +11007,8 @@</span><br><span> if (!device_state_sub) {</span><br><span> err = -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> manager_topic = ast_manager_get_topic();</span><br><span> queue_topic = ast_queue_topic_all();</span><br><span>diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c</span><br><span>index 3b05d72..c40e084 100644</span><br><span>--- a/channels/chan_dahdi.c</span><br><span>+++ b/channels/chan_dahdi.c</span><br><span>@@ -12690,6 +12690,8 @@</span><br><span> * knows that we care about it. Then, chan_dahdi will get the MWI from the</span><br><span> * event cache instead of checking the mailbox directly. */</span><br><span> tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> #ifdef HAVE_DAHDI_LINEREVERSE_VMWI</span><br><span>diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c</span><br><span>index 7a369e1..d534c00 100644</span><br><span>--- a/channels/chan_iax2.c</span><br><span>+++ b/channels/chan_iax2.c</span><br><span>@@ -1463,6 +1463,8 @@</span><br><span> if (!network_change_sub) {</span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -1476,6 +1478,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -13100,6 +13104,8 @@</span><br><span> * mailboxes. However, we just grab the events out of the cache when it</span><br><span> * is time to send MWI, since it is only sent with a REGACK. */</span><br><span> peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c</span><br><span>index c9ed1e2..5b3089b 100644</span><br><span>--- a/channels/chan_mgcp.c</span><br><span>+++ b/channels/chan_mgcp.c</span><br><span>@@ -4234,6 +4234,8 @@</span><br><span> * knows that we care about it. Then, chan_mgcp will get the MWI from the</span><br><span> * event cache instead of checking the mailbox directly. */</span><br><span> e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());</span><br><span>diff --git a/channels/chan_sip.c b/channels/chan_sip.c</span><br><span>index 97ce93c..49d5f64 100644</span><br><span>--- a/channels/chan_sip.c</span><br><span>+++ b/channels/chan_sip.c</span><br><span>@@ -17384,6 +17384,8 @@</span><br><span> if (!network_change_sub) {</span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -17397,6 +17399,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> </span><br><span> }</span><br><span>@@ -28163,6 +28167,9 @@</span><br><span> mailbox_specific_topic = ast_mwi_topic(mailbox->id);</span><br><span> if (mailbox_specific_topic) {</span><br><span> mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> }</span><br><span>diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c</span><br><span>index 43cc32a..9c63da9 100644</span><br><span>--- a/channels/chan_skinny.c</span><br><span>+++ b/channels/chan_skinny.c</span><br><span>@@ -8330,6 +8330,8 @@</span><br><span> mailbox_specific_topic = ast_mwi_topic(l->mailbox);</span><br><span> if (mailbox_specific_topic) {</span><br><span> l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/channels/sig_pri.c b/channels/sig_pri.c</span><br><span>index f371fbf..682abf3 100644</span><br><span>--- a/channels/sig_pri.c</span><br><span>+++ b/channels/sig_pri.c</span><br><span>@@ -9143,6 +9143,9 @@</span><br><span> if (!pri->mbox[i].sub) {</span><br><span> ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",</span><br><span> sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);</span><br><span style="color: hsl(120, 100%, 40%);">+ } else {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> #if defined(HAVE_PRI_MWI_V2)</span><br><span> if (ast_strlen_zero(pri->mbox[i].vm_number)) {</span><br><span>diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h</span><br><span>index 25faa46..a9d5a74 100644</span><br><span>--- a/include/asterisk/stasis.h</span><br><span>+++ b/include/asterisk/stasis.h</span><br><span>@@ -294,6 +294,15 @@</span><br><span> };</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Stasis subscription message filters</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+enum stasis_subscription_message_filter {</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+};</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Create a new message type.</span><br><span> *</span><br><span> * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done</span><br><span>@@ -329,6 +338,14 @@</span><br><span> unsigned int stasis_message_type_hash(const struct stasis_message_type *type);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Gets the id of a given message type</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The type to get the id of.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return The id</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Check whether a message type is declined</span><br><span> *</span><br><span> * \param name The name of the message type to check</span><br><span>@@ -501,6 +518,14 @@</span><br><span> const char *stasis_topic_name(const struct stasis_topic *topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Return the number of subscribers of a topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param topic Topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return Number of subscribers of the topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Publish a message to a topic's subscribers.</span><br><span> * \param topic Topic.</span><br><span> * \param message Message to publish.</span><br><span>@@ -569,6 +594,10 @@</span><br><span> * \return New \ref stasis_subscription object.</span><br><span> * \return \c NULL on error.</span><br><span> * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span> */</span><br><span> struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,</span><br><span> stasis_subscription_cb callback, void *data);</span><br><span>@@ -594,11 +623,69 @@</span><br><span> * \return New \ref stasis_subscription object.</span><br><span> * \return \c NULL on error.</span><br><span> * \since 12.8.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span> */</span><br><span> struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,</span><br><span> stasis_subscription_cb callback, void *data);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to allow the given message type to be</span><br><span style="color: hsl(120, 100%, 40%);">+ * raised to our subscription callback. This enables internal filtering in</span><br><span style="color: hsl(120, 100%, 40%);">+ * the stasis message bus to reduce messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription to add message type to.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note If you are wanting to use stasis_final_message you will need to accept</span><br><span style="color: hsl(120, 100%, 40%);">+ * \ref stasis_subscription_change_type as a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note Until the subscription is set to selective filtering it is possible for it</span><br><span style="color: hsl(120, 100%, 40%);">+ * to receive messages of message types that would not normally be accepted.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are not interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription to remove message type from.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we don't wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a subscription</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription that should receive all messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_set_filter(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Cancel a subscription.</span><br><span> *</span><br><span> * Note that in an asynchronous system, there may still be messages queued or</span><br><span>@@ -1053,6 +1140,41 @@</span><br><span> struct stasis_caching_topic *caching_topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a caching topic that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param caching_topic The caching topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a cache</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the underlying subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param caching_topic The caching topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief A message which instructs the caching topic to remove an entry from</span><br><span> * its cache.</span><br><span> *</span><br><span>diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h</span><br><span>index e61d3e9..514d62e 100644</span><br><span>--- a/include/asterisk/stasis_cache_pattern.h</span><br><span>+++ b/include/asterisk/stasis_cache_pattern.h</span><br><span>@@ -169,4 +169,39 @@</span><br><span> struct stasis_topic *stasis_cp_single_topic_cached(</span><br><span> struct stasis_cp_single *one);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to an instance that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param one One side of the cache pattern.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a cache</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the underlying subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param one One side of the cache pattern.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_set_filter(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */</span><br><span>diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h</span><br><span>index 50270a7..8dcdfcc 100644</span><br><span>--- a/include/asterisk/stasis_message_router.h</span><br><span>+++ b/include/asterisk/stasis_message_router.h</span><br><span>@@ -233,6 +233,10 @@</span><br><span> * \retval -1 on failure</span><br><span> *</span><br><span> * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note Setting a default callback will automatically cause the underlying</span><br><span style="color: hsl(120, 100%, 40%);">+ * subscription to receive all messages and not be filtered. If filtering is</span><br><span style="color: hsl(120, 100%, 40%);">+ * desired then a specific route for each message type should be provided.</span><br><span> */</span><br><span> int stasis_message_router_set_default(struct stasis_message_router *router,</span><br><span> stasis_subscription_cb callback,</span><br><span>diff --git a/main/ccss.c b/main/ccss.c</span><br><span>index 7ff77fd..205dc1b 100644</span><br><span>--- a/main/ccss.c</span><br><span>+++ b/main/ccss.c</span><br><span>@@ -1439,6 +1439,8 @@</span><br><span> cc_unref(generic_list, "Failed to subscribe to device state");</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> generic_list->current_state = ast_device_state(monitor->interface->device_name);</span><br><span> ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");</span><br><span> return generic_list;</span><br><span>@@ -2810,6 +2812,9 @@</span><br><span> if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> cc_ref(agent, "Ref agent for subscription");</span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/devicestate.c b/main/devicestate.c</span><br><span>index b665045..6706725 100644</span><br><span>--- a/main/devicestate.c</span><br><span>+++ b/main/devicestate.c</span><br><span>@@ -945,6 +945,8 @@</span><br><span> if (!device_state_topic_cached) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),</span><br><span> devstate_change_cb, NULL);</span><br><span>@@ -952,6 +954,8 @@</span><br><span> ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/endpoints.c b/main/endpoints.c</span><br><span>index 88506a4..69d022f 100644</span><br><span>--- a/main/endpoints.c</span><br><span>+++ b/main/endpoints.c</span><br><span>@@ -204,7 +204,7 @@</span><br><span> endpoint_publish_snapshot(endpoint);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-static void endpoint_default(void *data,</span><br><span style="color: hsl(120, 100%, 40%);">+static void endpoint_subscription_change(void *data,</span><br><span> struct stasis_subscription *sub,</span><br><span> struct stasis_message *message)</span><br><span> {</span><br><span>@@ -265,6 +265,8 @@</span><br><span> if (!endpoint->topics) {</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));</span><br><span> if (!endpoint->router) {</span><br><span>@@ -273,8 +275,9 @@</span><br><span> r |= stasis_message_router_add(endpoint->router,</span><br><span> stasis_cache_clear_type(), endpoint_cache_clear,</span><br><span> endpoint);</span><br><span style="color: hsl(0, 100%, 40%);">- r |= stasis_message_router_set_default(endpoint->router,</span><br><span style="color: hsl(0, 100%, 40%);">- endpoint_default, endpoint);</span><br><span style="color: hsl(120, 100%, 40%);">+ r |= stasis_message_router_add(endpoint->router,</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_change_type(), endpoint_subscription_change,</span><br><span style="color: hsl(120, 100%, 40%);">+ endpoint);</span><br><span> if (r) {</span><br><span> return NULL;</span><br><span> }</span><br><span>@@ -290,6 +293,8 @@</span><br><span> if (!endpoint->topics) {</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ao2_link(tech_endpoints, endpoint);</span><br><span> }</span><br><span>diff --git a/main/manager.c b/main/manager.c</span><br><span>index e5ca571..5b4cc3a 100644</span><br><span>--- a/main/manager.c</span><br><span>+++ b/main/manager.c</span><br><span>@@ -1521,6 +1521,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/main/pbx.c b/main/pbx.c</span><br><span>index c87496b..434173d 100644</span><br><span>--- a/main/pbx.c</span><br><span>+++ b/main/pbx.c</span><br><span>@@ -8339,10 +8339,15 @@</span><br><span> if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/presencestate.c b/main/presencestate.c</span><br><span>index 56c903c..ff4934a 100644</span><br><span>--- a/main/presencestate.c</span><br><span>+++ b/main/presencestate.c</span><br><span>@@ -389,6 +389,8 @@</span><br><span> if (!presence_state_topic_cached) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/stasis.c b/main/stasis.c</span><br><span>index 26e404c..d054897 100644</span><br><span>--- a/main/stasis.c</span><br><span>+++ b/main/stasis.c</span><br><span>@@ -372,6 +372,11 @@</span><br><span> return topic->name;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_VECTOR_SIZE(&topic->subscribers);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_subscription {</span><br><span> /*! Unique ID for this subscription */</span><br><span>@@ -393,6 +398,11 @@</span><br><span> /*! Flag set when final message for sub has been processed.</span><br><span> * Be sure join_lock is held before reading/setting. */</span><br><span> int final_message_processed;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! The message types this subscription is accepting */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR(, char) accepted_message_types;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! The message filter currently in use */</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter;</span><br><span> };</span><br><span> </span><br><span> static void subscription_dtor(void *obj)</span><br><span>@@ -411,6 +421,8 @@</span><br><span> ast_taskprocessor_unreference(sub->mailbox);</span><br><span> sub->mailbox = NULL;</span><br><span> ast_cond_destroy(&sub->join_cond);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_FREE(&sub->accepted_message_types);</span><br><span> }</span><br><span> </span><br><span> /*!</span><br><span>@@ -422,19 +434,25 @@</span><br><span> static void subscription_invoke(struct stasis_subscription *sub,</span><br><span> struct stasis_message *message)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+ unsigned int final = stasis_subscription_final_message(sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ int message_type_id = stasis_message_type_id(stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* Notify that the final message has been received */</span><br><span style="color: hsl(0, 100%, 40%);">- if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (final) {</span><br><span> ao2_lock(sub);</span><br><span> sub->final_message_rxed = 1;</span><br><span> ast_cond_signal(&sub->join_cond);</span><br><span> ao2_unlock(sub);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(0, 100%, 40%);">- sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||</span><br><span style="color: hsl(120, 100%, 40%);">+ (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(120, 100%, 40%);">+ sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> </span><br><span> /* Notify that the final message has been processed */</span><br><span style="color: hsl(0, 100%, 40%);">- if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (final) {</span><br><span> ao2_lock(sub);</span><br><span> sub->final_message_processed = 1;</span><br><span> ast_cond_signal(&sub->join_cond);</span><br><span>@@ -502,6 +520,8 @@</span><br><span> sub->callback = callback;</span><br><span> sub->data = data;</span><br><span> ast_cond_init(&sub->join_cond, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_INIT(&sub->accepted_message_types, 0);</span><br><span> </span><br><span> if (topic_add_subscription(topic, sub) != 0) {</span><br><span> ao2_ref(sub, -1);</span><br><span>@@ -588,6 +608,76 @@</span><br><span> return res;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(type != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(stasis_message_type_name(type) != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!type || !stasis_message_type_name(type)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Filtering is unreliable as this message type is not yet initialized</span><br><span style="color: hsl(120, 100%, 40%);">+ * so force all messages through.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* We do this for the same reason as above. The subscription can still operate, so allow</span><br><span style="color: hsl(120, 100%, 40%);">+ * it to do so by forcing all messages through.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(type != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(stasis_message_type_name(type) != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!type || !stasis_message_type_name(type)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* The memory is already allocated so this can't fail */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_set_filter(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = filter;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> void stasis_subscription_join(struct stasis_subscription *subscription)</span><br><span> {</span><br><span> if (subscription) {</span><br><span>@@ -783,6 +873,18 @@</span><br><span> struct stasis_message *message,</span><br><span> int synchronous)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Determine if this subscription is interested in this message. Note that final</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages are special and are always invoked on the subscription.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {</span><br><span style="color: hsl(120, 100%, 40%);">+ int message_type_id = stasis_message_type_id(stasis_message_type(message));</span><br><span style="color: hsl(120, 100%, 40%);">+ if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||</span><br><span style="color: hsl(120, 100%, 40%);">+ !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&</span><br><span style="color: hsl(120, 100%, 40%);">+ !stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> if (!sub->mailbox) {</span><br><span> /* Dispatch directly */</span><br><span> subscription_invoke(sub, message);</span><br><span>@@ -842,6 +944,11 @@</span><br><span> ast_assert(topic != NULL);</span><br><span> ast_assert(message != NULL);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* If there are no subscribers don't bother */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!stasis_topic_subscribers(topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*</span><br><span> * The topic may be unref'ed by the subscription invocation.</span><br><span> * Make sure we hold onto a reference while dispatching.</span><br><span>diff --git a/main/stasis_cache.c b/main/stasis_cache.c</span><br><span>index 1adfc0e..fd560b0 100644</span><br><span>--- a/main/stasis_cache.c</span><br><span>+++ b/main/stasis_cache.c</span><br><span>@@ -89,6 +89,35 @@</span><br><span> return caching_topic->topic;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ int res;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!caching_topic) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* We wait to accept the stasis specific message types until now so that by default everything</span><br><span style="color: hsl(120, 100%, 40%);">+ * will flow to us.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ res |= stasis_subscription_accept_message_type(caching_topic->sub, type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return res;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!caching_topic) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_subscription_set_filter(caching_topic->sub, filter);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)</span><br><span> {</span><br><span> if (!caching_topic) {</span><br><span>@@ -858,11 +887,13 @@</span><br><span> /* Update the cache */</span><br><span> snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);</span><br><span> if (snapshots.old || msg_put) {</span><br><span style="color: hsl(0, 100%, 40%);">- update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(0, 100%, 40%);">- if (update) {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_topic_subscribers(caching_topic->topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (update) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(update, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- ao2_cleanup(update);</span><br><span> } else {</span><br><span> ast_debug(1,</span><br><span> "Attempting to remove an item from the %s cache that isn't there: %s %s\n",</span><br><span>@@ -875,11 +906,13 @@</span><br><span> caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,</span><br><span> snapshots.aggregate_new);</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);</span><br><span style="color: hsl(0, 100%, 40%);">- if (update) {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_topic_subscribers(caching_topic->topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (update) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(update, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- ao2_cleanup(update);</span><br><span> }</span><br><span> </span><br><span> ao2_cleanup(snapshots.old);</span><br><span>diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c</span><br><span>index 2a2ea44..c2ea766 100644</span><br><span>--- a/main/stasis_cache_pattern.c</span><br><span>+++ b/main/stasis_cache_pattern.c</span><br><span>@@ -219,3 +219,21 @@</span><br><span> }</span><br><span> return stasis_caching_get_topic(one->topic_cached);</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!one) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_caching_accept_message_type(one->topic_cached, type);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_set_filter(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!one) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_caching_set_filter(one->topic_cached, filter);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span>diff --git a/main/stasis_message.c b/main/stasis_message.c</span><br><span>index ef03d13..2685a43 100644</span><br><span>--- a/main/stasis_message.c</span><br><span>+++ b/main/stasis_message.c</span><br><span>@@ -41,9 +41,11 @@</span><br><span> struct stasis_message_vtable *vtable;</span><br><span> char *name;</span><br><span> unsigned int hash;</span><br><span style="color: hsl(120, 100%, 40%);">+ int id;</span><br><span> };</span><br><span> </span><br><span> static struct stasis_message_vtable null_vtable = {};</span><br><span style="color: hsl(120, 100%, 40%);">+static int message_type_id;</span><br><span> </span><br><span> static void message_type_dtor(void *obj)</span><br><span> {</span><br><span>@@ -80,6 +82,7 @@</span><br><span> }</span><br><span> type->hash = ast_hashtab_hash_string(name);</span><br><span> type->vtable = vtable;</span><br><span style="color: hsl(120, 100%, 40%);">+ type->id = ast_atomic_fetchadd_int(&message_type_id, +1);</span><br><span> *result = type;</span><br><span> </span><br><span> return STASIS_MESSAGE_TYPE_SUCCESS;</span><br><span>@@ -95,6 +98,11 @@</span><br><span> return type->hash;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return type->id;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_message {</span><br><span> /*! Time the message was created */</span><br><span>diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c</span><br><span>index 498ddd6..e9aebe8 100644</span><br><span>--- a/main/stasis_message_router.c</span><br><span>+++ b/main/stasis_message_router.c</span><br><span>@@ -237,6 +237,9 @@</span><br><span> return NULL;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* We need to receive subscription change messages so we know when our subscription goes away */</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> return router;</span><br><span> }</span><br><span> </span><br><span>@@ -318,6 +321,14 @@</span><br><span> }</span><br><span> ao2_lock(router);</span><br><span> res = route_table_add(&router->routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, message_type);</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Until a specific message type was added we would already drop the message, so being</span><br><span style="color: hsl(120, 100%, 40%);">+ * selective now doesn't harm us. If we have a default route then we are already forced</span><br><span style="color: hsl(120, 100%, 40%);">+ * to filter nothing and messages will come in regardless.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> ao2_unlock(router);</span><br><span> return res;</span><br><span> }</span><br><span>@@ -336,6 +347,10 @@</span><br><span> }</span><br><span> ao2_lock(router);</span><br><span> res = route_table_add(&router->cache_routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> ao2_unlock(router);</span><br><span> return res;</span><br><span> }</span><br><span>@@ -380,6 +395,9 @@</span><br><span> router->default_route.callback = callback;</span><br><span> router->default_route.data = data;</span><br><span> ao2_unlock(router);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* While this implementation can never fail, it used to be able to */</span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c</span><br><span>index 99999c9..fea598c 100644</span><br><span>--- a/res/parking/parking_applications.c</span><br><span>+++ b/res/parking/parking_applications.c</span><br><span>@@ -870,6 +870,10 @@</span><br><span> return;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_parked_call_type() != stasis_message_type(message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> if (payload->event_type != PARKED_CALL) {</span><br><span> /* We are only concerned with calls parked */</span><br><span> return;</span><br><span>@@ -956,6 +960,10 @@</span><br><span> return -1;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* Now for the fun part... park it! */</span><br><span> ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);</span><br><span> </span><br><span>diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c</span><br><span>index b4884db..cf5cc72 100644</span><br><span>--- a/res/parking/parking_bridge_features.c</span><br><span>+++ b/res/parking/parking_bridge_features.c</span><br><span>@@ -195,6 +195,9 @@</span><br><span> if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> datastore->data = parked_datastore;</span><br><span> </span><br><span>diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c</span><br><span>index ed28164..5919b99 100644</span><br><span>--- a/res/parking/parking_manager.c</span><br><span>+++ b/res/parking/parking_manager.c</span><br><span>@@ -688,6 +688,8 @@</span><br><span> {</span><br><span> if (!parking_sub) {</span><br><span> parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c</span><br><span>index d799b46..418b63e 100644</span><br><span>--- a/res/res_hep_rtcp.c</span><br><span>+++ b/res/res_hep_rtcp.c</span><br><span>@@ -169,6 +169,9 @@</span><br><span> if (!stasis_rtp_subscription) {</span><br><span> return AST_MODULE_LOAD_DECLINE;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c</span><br><span>index f2ddf57..f8c2392 100644</span><br><span>--- a/res/res_pjsip_mwi.c</span><br><span>+++ b/res/res_pjsip_mwi.c</span><br><span>@@ -269,6 +269,9 @@</span><br><span> ao2_ref(mwi_sub, -1);</span><br><span> mwi_stasis_sub = NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> return mwi_stasis_sub;</span><br><span> }</span><br><span> </span><br><span>@@ -1366,7 +1369,11 @@</span><br><span> if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {</span><br><span> ast_sip_push_task(NULL, send_initial_notify_all, NULL);</span><br><span> } else {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_subscription *sub;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c</span><br><span>index 4d80437..c42f59e 100644</span><br><span>--- a/res/res_pjsip_outbound_registration.c</span><br><span>+++ b/res/res_pjsip_outbound_registration.c</span><br><span>@@ -2285,6 +2285,8 @@</span><br><span> </span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c</span><br><span>index 53ee60f..2271d8b 100644</span><br><span>--- a/res/res_pjsip_publish_asterisk.c</span><br><span>+++ b/res/res_pjsip_publish_asterisk.c</span><br><span>@@ -360,6 +360,9 @@</span><br><span> ao2_ref(datastore, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);</span><br><span>@@ -435,6 +438,9 @@</span><br><span> ao2_ref(datastore, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);</span><br><span>diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c</span><br><span>index 1c0145c..eb4545b 100644</span><br><span>--- a/res/res_pjsip_pubsub.c</span><br><span>+++ b/res/res_pjsip_pubsub.c</span><br><span>@@ -5634,7 +5634,11 @@</span><br><span> if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {</span><br><span> ast_sip_push_task(NULL, subscription_persistence_load, NULL);</span><br><span> } else {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_subscription *sub;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> </span><br><span> ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,</span><br><span>diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c</span><br><span>index b45b5a5..9b35c6a 100644</span><br><span>--- a/res/res_pjsip_refer.c</span><br><span>+++ b/res/res_pjsip_refer.c</span><br><span>@@ -686,6 +686,10 @@</span><br><span> ast_channel_unlock(chan);</span><br><span> </span><br><span> ao2_cleanup(refer->progress);</span><br><span style="color: hsl(120, 100%, 40%);">+ } else {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_security_log.c b/res/res_security_log.c</span><br><span>index c3fb3cf..760d155 100644</span><br><span>--- a/res/res_security_log.c</span><br><span>+++ b/res/res_security_log.c</span><br><span>@@ -143,6 +143,8 @@</span><br><span> LOG_SECURITY = -1;</span><br><span> return AST_MODULE_LOAD_DECLINE;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ast_verb(3, "Security Logging Enabled\n");</span><br><span> </span><br><span>diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c</span><br><span>index 34c77d9..fbdfb3d 100644</span><br><span>--- a/res/res_stasis_device_state.c</span><br><span>+++ b/res/res_stasis_device_state.c</span><br><span>@@ -396,6 +396,9 @@</span><br><span> ao2_ref(sub, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);</span><br><span> ao2_unlock(device_state_subscriptions);</span><br><span>diff --git a/res/res_xmpp.c b/res/res_xmpp.c</span><br><span>index 41f8996..a85892f 100644</span><br><span>--- a/res/res_xmpp.c</span><br><span>+++ b/res/res_xmpp.c</span><br><span>@@ -1628,11 +1628,15 @@</span><br><span> if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {</span><br><span> return;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {</span><br><span> client->mwi_sub = stasis_unsubscribe(client->mwi_sub);</span><br><span> return;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/10640">change 10640</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/10640"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 13 </div>
<div style="display:none"> Gerrit-MessageType: merged </div>
<div style="display:none"> Gerrit-Change-Id: I99bee23895baa0a117985d51683f7963b77aa190 </div>
<div style="display:none"> Gerrit-Change-Number: 10640 </div>
<div style="display:none"> Gerrit-PatchSet: 3 </div>
<div style="display:none"> Gerrit-Owner: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Benjamin Keith Ford <bford@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Corey Farrell <git@cfware.com> </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Jenkins2 (1000185) </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>