<p>Joshua Colp <strong>merged</strong> this change.</p><p><a href="https://gerrit.asterisk.org/10639">View Change</a></p><div style="white-space:pre-wrap">Approvals:
Corey Farrell: Looks good to me, but someone else must approve
George Joseph: Looks good to me, approved
Joshua Colp: Approved for Submit
</div><pre style="font-family: monospace,monospace; white-space: pre-wrap;">stasis: Add internal filtering of messages.<br><br>This change adds the ability for subscriptions to indicate<br>which message types they are interested in accepting. By<br>doing so the filtering is done before being dispatched<br>to the subscriber, reducing the amount of work that has<br>to be done.<br><br>This is optional and if a subscriber does not add<br>message types they wish to accept and set the subscription<br>to selective filtering the previous behavior is preserved<br>and they receive all messages.<br><br>There is also the ability to explicitly force the reception<br>of all messages for cases such as AMI or ARI where a large<br>number of messages are expected that are then generically<br>converted into a different format.<br><br>ASTERISK-28103<br><br>Change-Id: I99bee23895baa0a117985d51683f7963b77aa190<br>---<br>M apps/app_queue.c<br>M channels/chan_dahdi.c<br>M channels/chan_iax2.c<br>M channels/chan_mgcp.c<br>M channels/chan_sip.c<br>M channels/chan_skinny.c<br>M channels/sig_pri.c<br>M include/asterisk/stasis.h<br>M include/asterisk/stasis_cache_pattern.h<br>M include/asterisk/stasis_message_router.h<br>M main/ccss.c<br>M main/devicestate.c<br>M main/endpoints.c<br>M main/manager.c<br>M main/pbx.c<br>M main/presencestate.c<br>M main/stasis.c<br>M main/stasis_cache.c<br>M main/stasis_cache_pattern.c<br>M main/stasis_message.c<br>M main/stasis_message_router.c<br>M res/parking/parking_applications.c<br>M res/parking/parking_bridge_features.c<br>M res/parking/parking_manager.c<br>M res/res_hep_rtcp.c<br>M res/res_pjsip_mwi.c<br>M res/res_pjsip_outbound_registration.c<br>M res/res_pjsip_publish_asterisk.c<br>M res/res_pjsip_pubsub.c<br>M res/res_pjsip_refer.c<br>M res/res_security_log.c<br>M res/res_stasis_device_state.c<br>M res/res_xmpp.c<br>33 files changed, 457 insertions(+), 17 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/apps/app_queue.c b/apps/app_queue.c</span><br><span>index 80c253f..b299889 100644</span><br><span>--- a/apps/app_queue.c</span><br><span>+++ b/apps/app_queue.c</span><br><span>@@ -11336,6 +11336,8 @@</span><br><span> if (!device_state_sub) {</span><br><span> err = -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> manager_topic = ast_manager_get_topic();</span><br><span> queue_topic = ast_queue_topic_all();</span><br><span>diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c</span><br><span>index f4f6514..1eb618b 100644</span><br><span>--- a/channels/chan_dahdi.c</span><br><span>+++ b/channels/chan_dahdi.c</span><br><span>@@ -12594,6 +12594,8 @@</span><br><span> * knows that we care about it. Then, chan_dahdi will get the MWI from the</span><br><span> * event cache instead of checking the mailbox directly. */</span><br><span> tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> #ifdef HAVE_DAHDI_LINEREVERSE_VMWI</span><br><span>diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c</span><br><span>index 01d42b5..0ca4234 100644</span><br><span>--- a/channels/chan_iax2.c</span><br><span>+++ b/channels/chan_iax2.c</span><br><span>@@ -1456,6 +1456,8 @@</span><br><span> if (!network_change_sub) {</span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -1469,6 +1471,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -13072,6 +13076,8 @@</span><br><span> * mailboxes. However, we just grab the events out of the cache when it</span><br><span> * is time to send MWI, since it is only sent with a REGACK. */</span><br><span> peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c</span><br><span>index 2ac7690..46342ce 100644</span><br><span>--- a/channels/chan_mgcp.c</span><br><span>+++ b/channels/chan_mgcp.c</span><br><span>@@ -4242,6 +4242,8 @@</span><br><span> * knows that we care about it. Then, chan_mgcp will get the MWI from the</span><br><span> * event cache instead of checking the mailbox directly. */</span><br><span> e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());</span><br><span>diff --git a/channels/chan_sip.c b/channels/chan_sip.c</span><br><span>index 2693e5f..9089b5e 100644</span><br><span>--- a/channels/chan_sip.c</span><br><span>+++ b/channels/chan_sip.c</span><br><span>@@ -17494,6 +17494,8 @@</span><br><span> if (!network_change_sub) {</span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>@@ -17507,6 +17509,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> </span><br><span> }</span><br><span>@@ -28385,6 +28389,9 @@</span><br><span> mailbox_specific_topic = ast_mwi_topic(mailbox->id);</span><br><span> if (mailbox_specific_topic) {</span><br><span> mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> }</span><br><span>diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c</span><br><span>index 2b13e5e..910b7b8 100644</span><br><span>--- a/channels/chan_skinny.c</span><br><span>+++ b/channels/chan_skinny.c</span><br><span>@@ -8334,6 +8334,8 @@</span><br><span> mailbox_specific_topic = ast_mwi_topic(l->mailbox);</span><br><span> if (mailbox_specific_topic) {</span><br><span> l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/channels/sig_pri.c b/channels/sig_pri.c</span><br><span>index fbc4e40..ec6d666 100644</span><br><span>--- a/channels/sig_pri.c</span><br><span>+++ b/channels/sig_pri.c</span><br><span>@@ -9130,6 +9130,9 @@</span><br><span> if (!pri->mbox[i].sub) {</span><br><span> ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",</span><br><span> sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);</span><br><span style="color: hsl(120, 100%, 40%);">+ } else {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> #if defined(HAVE_PRI_MWI_V2)</span><br><span> if (ast_strlen_zero(pri->mbox[i].vm_number)) {</span><br><span>diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h</span><br><span>index 2b56b53..ebd00ee 100644</span><br><span>--- a/include/asterisk/stasis.h</span><br><span>+++ b/include/asterisk/stasis.h</span><br><span>@@ -292,6 +292,15 @@</span><br><span> };</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Stasis subscription message filters</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+enum stasis_subscription_message_filter {</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+ STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */</span><br><span style="color: hsl(120, 100%, 40%);">+};</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Create a new message type.</span><br><span> *</span><br><span> * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done</span><br><span>@@ -327,6 +336,14 @@</span><br><span> unsigned int stasis_message_type_hash(const struct stasis_message_type *type);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Gets the id of a given message type</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The type to get the id of.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return The id</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Check whether a message type is declined</span><br><span> *</span><br><span> * \param name The name of the message type to check</span><br><span>@@ -495,6 +512,14 @@</span><br><span> const char *stasis_topic_name(const struct stasis_topic *topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Return the number of subscribers of a topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param topic Topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \return Number of subscribers of the topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Publish a message to a topic's subscribers.</span><br><span> * \param topic Topic.</span><br><span> * \param message Message to publish.</span><br><span>@@ -559,6 +584,10 @@</span><br><span> * \return New \ref stasis_subscription object.</span><br><span> * \return \c NULL on error.</span><br><span> * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span> */</span><br><span> struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,</span><br><span> stasis_subscription_cb callback, void *data);</span><br><span>@@ -584,11 +613,69 @@</span><br><span> * \return New \ref stasis_subscription object.</span><br><span> * \return \c NULL on error.</span><br><span> * \since 12.8.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note This callback will receive a callback with a message indicating it</span><br><span style="color: hsl(120, 100%, 40%);">+ * has been subscribed. This occurs immediately before accepted message</span><br><span style="color: hsl(120, 100%, 40%);">+ * types can be set and the callback must expect to receive it.</span><br><span> */</span><br><span> struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,</span><br><span> stasis_subscription_cb callback, void *data);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to allow the given message type to be</span><br><span style="color: hsl(120, 100%, 40%);">+ * raised to our subscription callback. This enables internal filtering in</span><br><span style="color: hsl(120, 100%, 40%);">+ * the stasis message bus to reduce messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription to add message type to.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note If you are wanting to use stasis_final_message you will need to accept</span><br><span style="color: hsl(120, 100%, 40%);">+ * \ref stasis_subscription_change_type as a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note Until the subscription is set to selective filtering it is possible for it</span><br><span style="color: hsl(120, 100%, 40%);">+ * to receive messages of message types that would not normally be accepted.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a subscription that we are not interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription to remove message type from.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we don't wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a subscription</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param subscription Subscription that should receive all messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_set_filter(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief Cancel a subscription.</span><br><span> *</span><br><span> * Note that in an asynchronous system, there may still be messages queued or</span><br><span>@@ -1037,6 +1124,41 @@</span><br><span> struct stasis_caching_topic *caching_topic);</span><br><span> </span><br><span> /*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to a caching topic that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param caching_topic The caching topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a cache</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the underlying subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param caching_topic The caching topic.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span> * \brief A message which instructs the caching topic to remove an entry from</span><br><span> * its cache.</span><br><span> *</span><br><span>diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h</span><br><span>index e61d3e9..514d62e 100644</span><br><span>--- a/include/asterisk/stasis_cache_pattern.h</span><br><span>+++ b/include/asterisk/stasis_cache_pattern.h</span><br><span>@@ -169,4 +169,39 @@</span><br><span> struct stasis_topic *stasis_cp_single_topic_cached(</span><br><span> struct stasis_cp_single *one);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Indicate to an instance that we are interested in a message type.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the caching topic to receive messages of the given message</span><br><span style="color: hsl(120, 100%, 40%);">+ * type. This enables internal filtering in the stasis message bus to reduce</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param one One side of the cache pattern.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param type The message type we wish to receive.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+/*!</span><br><span style="color: hsl(120, 100%, 40%);">+ * \brief Set the message type filtering level on a cache</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * This will cause the underlying subscription to filter messages according to the</span><br><span style="color: hsl(120, 100%, 40%);">+ * provided filter level. For example if selective is used then only</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages matching those provided to \ref stasis_subscription_accept_message_type</span><br><span style="color: hsl(120, 100%, 40%);">+ * will be raised to the subscription callback.</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param one One side of the cache pattern.</span><br><span style="color: hsl(120, 100%, 40%);">+ * \param filter What filter to use</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval 0 on success</span><br><span style="color: hsl(120, 100%, 40%);">+ * \retval -1 failure</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \since 17.0.0</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_set_filter(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */</span><br><span>diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h</span><br><span>index 50270a7..8dcdfcc 100644</span><br><span>--- a/include/asterisk/stasis_message_router.h</span><br><span>+++ b/include/asterisk/stasis_message_router.h</span><br><span>@@ -233,6 +233,10 @@</span><br><span> * \retval -1 on failure</span><br><span> *</span><br><span> * \since 12</span><br><span style="color: hsl(120, 100%, 40%);">+ *</span><br><span style="color: hsl(120, 100%, 40%);">+ * \note Setting a default callback will automatically cause the underlying</span><br><span style="color: hsl(120, 100%, 40%);">+ * subscription to receive all messages and not be filtered. If filtering is</span><br><span style="color: hsl(120, 100%, 40%);">+ * desired then a specific route for each message type should be provided.</span><br><span> */</span><br><span> int stasis_message_router_set_default(struct stasis_message_router *router,</span><br><span> stasis_subscription_cb callback,</span><br><span>diff --git a/main/ccss.c b/main/ccss.c</span><br><span>index 5758574..52ec586 100644</span><br><span>--- a/main/ccss.c</span><br><span>+++ b/main/ccss.c</span><br><span>@@ -1433,6 +1433,8 @@</span><br><span> cc_unref(generic_list, "Failed to subscribe to device state");</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> generic_list->current_state = ast_device_state(monitor->interface->device_name);</span><br><span> ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");</span><br><span> return generic_list;</span><br><span>@@ -2804,6 +2806,9 @@</span><br><span> if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> cc_ref(agent, "Ref agent for subscription");</span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/devicestate.c b/main/devicestate.c</span><br><span>index 7dcbe82..b6c740c 100644</span><br><span>--- a/main/devicestate.c</span><br><span>+++ b/main/devicestate.c</span><br><span>@@ -920,6 +920,8 @@</span><br><span> if (!device_state_topic_cached) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),</span><br><span> devstate_change_cb, NULL);</span><br><span>@@ -927,6 +929,8 @@</span><br><span> ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/endpoints.c b/main/endpoints.c</span><br><span>index f1608f3..3129fb4 100644</span><br><span>--- a/main/endpoints.c</span><br><span>+++ b/main/endpoints.c</span><br><span>@@ -202,7 +202,7 @@</span><br><span> endpoint_publish_snapshot(endpoint);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-static void endpoint_default(void *data,</span><br><span style="color: hsl(120, 100%, 40%);">+static void endpoint_subscription_change(void *data,</span><br><span> struct stasis_subscription *sub,</span><br><span> struct stasis_message *message)</span><br><span> {</span><br><span>@@ -263,6 +263,8 @@</span><br><span> if (!endpoint->topics) {</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));</span><br><span> if (!endpoint->router) {</span><br><span>@@ -271,8 +273,9 @@</span><br><span> r |= stasis_message_router_add(endpoint->router,</span><br><span> stasis_cache_clear_type(), endpoint_cache_clear,</span><br><span> endpoint);</span><br><span style="color: hsl(0, 100%, 40%);">- r |= stasis_message_router_set_default(endpoint->router,</span><br><span style="color: hsl(0, 100%, 40%);">- endpoint_default, endpoint);</span><br><span style="color: hsl(120, 100%, 40%);">+ r |= stasis_message_router_add(endpoint->router,</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_change_type(), endpoint_subscription_change,</span><br><span style="color: hsl(120, 100%, 40%);">+ endpoint);</span><br><span> if (r) {</span><br><span> return NULL;</span><br><span> }</span><br><span>@@ -288,6 +291,8 @@</span><br><span> if (!endpoint->topics) {</span><br><span> return NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ao2_link(tech_endpoints, endpoint);</span><br><span> }</span><br><span>diff --git a/main/manager.c b/main/manager.c</span><br><span>index 7accaa1..0da023a 100644</span><br><span>--- a/main/manager.c</span><br><span>+++ b/main/manager.c</span><br><span>@@ -1527,6 +1527,8 @@</span><br><span> if (!acl_change_sub) {</span><br><span> acl_change_sub = stasis_subscribe(ast_security_topic(),</span><br><span> acl_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/main/pbx.c b/main/pbx.c</span><br><span>index f961295..0a23735 100644</span><br><span>--- a/main/pbx.c</span><br><span>+++ b/main/pbx.c</span><br><span>@@ -8416,10 +8416,15 @@</span><br><span> if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/main/presencestate.c b/main/presencestate.c</span><br><span>index 4121bf5..65b7f69 100644</span><br><span>--- a/main/presencestate.c</span><br><span>+++ b/main/presencestate.c</span><br><span>@@ -514,6 +514,8 @@</span><br><span> if (!presence_state_topic_cached) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> AST_TEST_REGISTER(test_presence_chan);</span><br><span> </span><br><span>diff --git a/main/stasis.c b/main/stasis.c</span><br><span>index ed83873..93112d9 100644</span><br><span>--- a/main/stasis.c</span><br><span>+++ b/main/stasis.c</span><br><span>@@ -370,6 +370,11 @@</span><br><span> return topic->name;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+size_t stasis_topic_subscribers(const struct stasis_topic *topic)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return AST_VECTOR_SIZE(&topic->subscribers);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_subscription {</span><br><span> /*! Unique ID for this subscription */</span><br><span>@@ -391,6 +396,11 @@</span><br><span> /*! Flag set when final message for sub has been processed.</span><br><span> * Be sure join_lock is held before reading/setting. */</span><br><span> int final_message_processed;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! The message types this subscription is accepting */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR(, char) accepted_message_types;</span><br><span style="color: hsl(120, 100%, 40%);">+ /*! The message filter currently in use */</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter;</span><br><span> };</span><br><span> </span><br><span> static void subscription_dtor(void *obj)</span><br><span>@@ -409,6 +419,8 @@</span><br><span> ast_taskprocessor_unreference(sub->mailbox);</span><br><span> sub->mailbox = NULL;</span><br><span> ast_cond_destroy(&sub->join_cond);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_FREE(&sub->accepted_message_types);</span><br><span> }</span><br><span> </span><br><span> /*!</span><br><span>@@ -420,19 +432,25 @@</span><br><span> static void subscription_invoke(struct stasis_subscription *sub,</span><br><span> struct stasis_message *message)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+ unsigned int final = stasis_subscription_final_message(sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ int message_type_id = stasis_message_type_id(stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* Notify that the final message has been received */</span><br><span style="color: hsl(0, 100%, 40%);">- if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (final) {</span><br><span> ao2_lock(sub);</span><br><span> sub->final_message_rxed = 1;</span><br><span> ast_cond_signal(&sub->join_cond);</span><br><span> ao2_unlock(sub);</span><br><span> }</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(0, 100%, 40%);">- sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||</span><br><span style="color: hsl(120, 100%, 40%);">+ (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Since sub is mostly immutable, no need to lock sub */</span><br><span style="color: hsl(120, 100%, 40%);">+ sub->callback(sub->data, sub, message);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> </span><br><span> /* Notify that the final message has been processed */</span><br><span style="color: hsl(0, 100%, 40%);">- if (stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ if (final) {</span><br><span> ao2_lock(sub);</span><br><span> sub->final_message_processed = 1;</span><br><span> ast_cond_signal(&sub->join_cond);</span><br><span>@@ -500,6 +518,8 @@</span><br><span> sub->callback = callback;</span><br><span> sub->data = data;</span><br><span> ast_cond_init(&sub->join_cond, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_INIT(&sub->accepted_message_types, 0);</span><br><span> </span><br><span> if (topic_add_subscription(topic, sub) != 0) {</span><br><span> ao2_ref(sub, -1);</span><br><span>@@ -586,6 +606,76 @@</span><br><span> return res;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(type != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(stasis_message_type_name(type) != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!type || !stasis_message_type_name(type)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Filtering is unreliable as this message type is not yet initialized</span><br><span style="color: hsl(120, 100%, 40%);">+ * so force all messages through.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* We do this for the same reason as above. The subscription can still operate, so allow</span><br><span style="color: hsl(120, 100%, 40%);">+ * it to do so by forcing all messages through.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(type != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ ast_assert(stasis_message_type_name(type) != NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!type || !stasis_message_type_name(type)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* The memory is already allocated so this can't fail */</span><br><span style="color: hsl(120, 100%, 40%);">+ AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_subscription_set_filter(struct stasis_subscription *subscription,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!subscription) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_lock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {</span><br><span style="color: hsl(120, 100%, 40%);">+ subscription->filter = filter;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_unlock(subscription->topic);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return 0;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> void stasis_subscription_join(struct stasis_subscription *subscription)</span><br><span> {</span><br><span> if (subscription) {</span><br><span>@@ -781,6 +871,18 @@</span><br><span> struct stasis_message *message,</span><br><span> int synchronous)</span><br><span> {</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Determine if this subscription is interested in this message. Note that final</span><br><span style="color: hsl(120, 100%, 40%);">+ * messages are special and are always invoked on the subscription.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {</span><br><span style="color: hsl(120, 100%, 40%);">+ int message_type_id = stasis_message_type_id(stasis_message_type(message));</span><br><span style="color: hsl(120, 100%, 40%);">+ if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||</span><br><span style="color: hsl(120, 100%, 40%);">+ !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&</span><br><span style="color: hsl(120, 100%, 40%);">+ !stasis_subscription_final_message(sub, message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> if (!sub->mailbox) {</span><br><span> /* Dispatch directly */</span><br><span> subscription_invoke(sub, message);</span><br><span>@@ -840,6 +942,11 @@</span><br><span> ast_assert(topic != NULL);</span><br><span> ast_assert(message != NULL);</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* If there are no subscribers don't bother */</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!stasis_topic_subscribers(topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*</span><br><span> * The topic may be unref'ed by the subscription invocation.</span><br><span> * Make sure we hold onto a reference while dispatching.</span><br><span>diff --git a/main/stasis_cache.c b/main/stasis_cache.c</span><br><span>index 3d353b3..bc975fd 100644</span><br><span>--- a/main/stasis_cache.c</span><br><span>+++ b/main/stasis_cache.c</span><br><span>@@ -87,6 +87,35 @@</span><br><span> return caching_topic->topic;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ int res;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!caching_topic) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ /* We wait to accept the stasis specific message types until now so that by default everything</span><br><span style="color: hsl(120, 100%, 40%);">+ * will flow to us.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ res |= stasis_subscription_accept_message_type(caching_topic->sub, type);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return res;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!caching_topic) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_subscription_set_filter(caching_topic->sub, filter);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)</span><br><span> {</span><br><span> if (!caching_topic) {</span><br><span>@@ -856,11 +885,13 @@</span><br><span> /* Update the cache */</span><br><span> snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);</span><br><span> if (snapshots.old || msg_put) {</span><br><span style="color: hsl(0, 100%, 40%);">- update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(0, 100%, 40%);">- if (update) {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_topic_subscribers(caching_topic->topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ update = update_create(snapshots.old, msg_put);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (update) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(update, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- ao2_cleanup(update);</span><br><span> } else {</span><br><span> ast_debug(1,</span><br><span> "Attempting to remove an item from the %s cache that isn't there: %s %s\n",</span><br><span>@@ -873,11 +904,13 @@</span><br><span> caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,</span><br><span> snapshots.aggregate_new);</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);</span><br><span style="color: hsl(0, 100%, 40%);">- if (update) {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (stasis_topic_subscribers(caching_topic->topic)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (update) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_publish(caching_topic->topic, update);</span><br><span style="color: hsl(120, 100%, 40%);">+ ao2_ref(update, -1);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> }</span><br><span style="color: hsl(0, 100%, 40%);">- ao2_cleanup(update);</span><br><span> }</span><br><span> </span><br><span> ao2_cleanup(snapshots.old);</span><br><span>diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c</span><br><span>index f0e34b9..04d8164 100644</span><br><span>--- a/main/stasis_cache_pattern.c</span><br><span>+++ b/main/stasis_cache_pattern.c</span><br><span>@@ -217,3 +217,21 @@</span><br><span> }</span><br><span> return stasis_caching_get_topic(one->topic_cached);</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!one) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_caching_accept_message_type(one->topic_cached, type);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_cp_single_set_filter(struct stasis_cp_single *one,</span><br><span style="color: hsl(120, 100%, 40%);">+ enum stasis_subscription_message_filter filter)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!one) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return -1;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+ return stasis_caching_set_filter(one->topic_cached, filter);</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span>diff --git a/main/stasis_message.c b/main/stasis_message.c</span><br><span>index 19f4a92..1fdbe85 100644</span><br><span>--- a/main/stasis_message.c</span><br><span>+++ b/main/stasis_message.c</span><br><span>@@ -39,9 +39,11 @@</span><br><span> struct stasis_message_vtable *vtable;</span><br><span> char *name;</span><br><span> unsigned int hash;</span><br><span style="color: hsl(120, 100%, 40%);">+ int id;</span><br><span> };</span><br><span> </span><br><span> static struct stasis_message_vtable null_vtable = {};</span><br><span style="color: hsl(120, 100%, 40%);">+static int message_type_id;</span><br><span> </span><br><span> static void message_type_dtor(void *obj)</span><br><span> {</span><br><span>@@ -78,6 +80,7 @@</span><br><span> }</span><br><span> type->hash = ast_hashtab_hash_string(name);</span><br><span> type->vtable = vtable;</span><br><span style="color: hsl(120, 100%, 40%);">+ type->id = ast_atomic_fetchadd_int(&message_type_id, +1);</span><br><span> *result = type;</span><br><span> </span><br><span> return STASIS_MESSAGE_TYPE_SUCCESS;</span><br><span>@@ -93,6 +96,11 @@</span><br><span> return type->hash;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+int stasis_message_type_id(const struct stasis_message_type *type)</span><br><span style="color: hsl(120, 100%, 40%);">+{</span><br><span style="color: hsl(120, 100%, 40%);">+ return type->id;</span><br><span style="color: hsl(120, 100%, 40%);">+}</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /*! \internal */</span><br><span> struct stasis_message {</span><br><span> /*! Time the message was created */</span><br><span>diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c</span><br><span>index 41d426b..41ebc7e 100644</span><br><span>--- a/main/stasis_message_router.c</span><br><span>+++ b/main/stasis_message_router.c</span><br><span>@@ -235,6 +235,9 @@</span><br><span> return NULL;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ /* We need to receive subscription change messages so we know when our subscription goes away */</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> return router;</span><br><span> }</span><br><span> </span><br><span>@@ -316,6 +319,14 @@</span><br><span> }</span><br><span> ao2_lock(router);</span><br><span> res = route_table_add(&router->routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, message_type);</span><br><span style="color: hsl(120, 100%, 40%);">+ /* Until a specific message type was added we would already drop the message, so being</span><br><span style="color: hsl(120, 100%, 40%);">+ * selective now doesn't harm us. If we have a default route then we are already forced</span><br><span style="color: hsl(120, 100%, 40%);">+ * to filter nothing and messages will come in regardless.</span><br><span style="color: hsl(120, 100%, 40%);">+ */</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> ao2_unlock(router);</span><br><span> return res;</span><br><span> }</span><br><span>@@ -334,6 +345,10 @@</span><br><span> }</span><br><span> ao2_lock(router);</span><br><span> res = route_table_add(&router->cache_routes, message_type, callback, data);</span><br><span style="color: hsl(120, 100%, 40%);">+ if (!res) {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span> ao2_unlock(router);</span><br><span> return res;</span><br><span> }</span><br><span>@@ -378,6 +393,9 @@</span><br><span> router->default_route.callback = callback;</span><br><span> router->default_route.data = data;</span><br><span> ao2_unlock(router);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* While this implementation can never fail, it used to be able to */</span><br><span> return 0;</span><br><span> }</span><br><span>diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c</span><br><span>index dd2fb75..f9b3e85 100644</span><br><span>--- a/res/parking/parking_applications.c</span><br><span>+++ b/res/parking/parking_applications.c</span><br><span>@@ -868,6 +868,10 @@</span><br><span> return;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if (ast_parked_call_type() != stasis_message_type(message)) {</span><br><span style="color: hsl(120, 100%, 40%);">+ return;</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> if (payload->event_type != PARKED_CALL) {</span><br><span> /* We are only concerned with calls parked */</span><br><span> return;</span><br><span>@@ -954,6 +958,10 @@</span><br><span> return -1;</span><br><span> }</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> /* Now for the fun part... park it! */</span><br><span> ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);</span><br><span> </span><br><span>diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c</span><br><span>index cbc23fa..ee2b5a1 100644</span><br><span>--- a/res/parking/parking_bridge_features.c</span><br><span>+++ b/res/parking/parking_bridge_features.c</span><br><span>@@ -213,6 +213,9 @@</span><br><span> if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> datastore->data = parked_datastore;</span><br><span> </span><br><span>diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c</span><br><span>index 6d0a4c0..83558ba 100644</span><br><span>--- a/res/parking/parking_manager.c</span><br><span>+++ b/res/parking/parking_manager.c</span><br><span>@@ -686,6 +686,8 @@</span><br><span> {</span><br><span> if (!parking_sub) {</span><br><span> parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c</span><br><span>index c3abbc1..f73cd44 100644</span><br><span>--- a/res/res_hep_rtcp.c</span><br><span>+++ b/res/res_hep_rtcp.c</span><br><span>@@ -167,6 +167,9 @@</span><br><span> if (!stasis_rtp_subscription) {</span><br><span> return AST_MODULE_LOAD_DECLINE;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c</span><br><span>index 4cd892c..83bff88 100644</span><br><span>--- a/res/res_pjsip_mwi.c</span><br><span>+++ b/res/res_pjsip_mwi.c</span><br><span>@@ -269,6 +269,9 @@</span><br><span> ao2_ref(mwi_sub, -1);</span><br><span> mwi_stasis_sub = NULL;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> return mwi_stasis_sub;</span><br><span> }</span><br><span> </span><br><span>@@ -1364,7 +1367,11 @@</span><br><span> if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {</span><br><span> ast_sip_push_task(NULL, send_initial_notify_all, NULL);</span><br><span> } else {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_subscription *sub;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c</span><br><span>index 0d815ad..7aba734 100644</span><br><span>--- a/res/res_pjsip_outbound_registration.c</span><br><span>+++ b/res/res_pjsip_outbound_registration.c</span><br><span>@@ -2282,6 +2282,8 @@</span><br><span> </span><br><span> network_change_sub = stasis_subscribe(ast_system_topic(),</span><br><span> network_change_stasis_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> return AST_MODULE_LOAD_SUCCESS;</span><br><span> }</span><br><span>diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c</span><br><span>index 220ba0b..692f9a7 100644</span><br><span>--- a/res/res_pjsip_publish_asterisk.c</span><br><span>+++ b/res/res_pjsip_publish_asterisk.c</span><br><span>@@ -360,6 +360,9 @@</span><br><span> ao2_ref(datastore, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);</span><br><span>@@ -435,6 +438,9 @@</span><br><span> ao2_ref(datastore, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);</span><br><span>diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c</span><br><span>index b5ee159..9e8a32b 100644</span><br><span>--- a/res/res_pjsip_pubsub.c</span><br><span>+++ b/res/res_pjsip_pubsub.c</span><br><span>@@ -5567,7 +5567,11 @@</span><br><span> if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {</span><br><span> ast_sip_push_task(NULL, subscription_persistence_load, NULL);</span><br><span> } else {</span><br><span style="color: hsl(0, 100%, 40%);">- stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ struct stasis_subscription *sub;</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> </span><br><span> ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,</span><br><span>diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c</span><br><span>index 1e6ca7f..3dfaabc 100644</span><br><span>--- a/res/res_pjsip_refer.c</span><br><span>+++ b/res/res_pjsip_refer.c</span><br><span>@@ -686,6 +686,10 @@</span><br><span> ast_channel_unlock(chan);</span><br><span> </span><br><span> ao2_cleanup(refer->progress);</span><br><span style="color: hsl(120, 100%, 40%);">+ } else {</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> }</span><br><span> }</span><br><span> </span><br><span>diff --git a/res/res_security_log.c b/res/res_security_log.c</span><br><span>index 555ba23..95429ca 100644</span><br><span>--- a/res/res_security_log.c</span><br><span>+++ b/res/res_security_log.c</span><br><span>@@ -141,6 +141,8 @@</span><br><span> LOG_SECURITY = -1;</span><br><span> return AST_MODULE_LOAD_DECLINE;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ast_verb(3, "Security Logging Enabled\n");</span><br><span> </span><br><span>diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c</span><br><span>index be09b15..1c80f9e 100644</span><br><span>--- a/res/res_stasis_device_state.c</span><br><span>+++ b/res/res_stasis_device_state.c</span><br><span>@@ -394,6 +394,9 @@</span><br><span> ao2_ref(sub, -1);</span><br><span> return -1;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);</span><br><span> ao2_unlock(device_state_subscriptions);</span><br><span>diff --git a/res/res_xmpp.c b/res/res_xmpp.c</span><br><span>index b72581f..7d032ad 100644</span><br><span>--- a/res/res_xmpp.c</span><br><span>+++ b/res/res_xmpp.c</span><br><span>@@ -1626,11 +1626,15 @@</span><br><span> if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {</span><br><span> return;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {</span><br><span> client->mwi_sub = stasis_unsubscribe(client->mwi_sub);</span><br><span> return;</span><br><span> }</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());</span><br><span style="color: hsl(120, 100%, 40%);">+ stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);</span><br><span> </span><br><span> cached = stasis_cache_dump(ast_device_state_cache(), NULL);</span><br><span> ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.asterisk.org/10639">change 10639</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.asterisk.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.asterisk.org/10639"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: asterisk </div>
<div style="display:none"> Gerrit-Branch: 16 </div>
<div style="display:none"> Gerrit-MessageType: merged </div>
<div style="display:none"> Gerrit-Change-Id: I99bee23895baa0a117985d51683f7963b77aa190 </div>
<div style="display:none"> Gerrit-Change-Number: 10639 </div>
<div style="display:none"> Gerrit-PatchSet: 2 </div>
<div style="display:none"> Gerrit-Owner: Joshua Colp <jcolp@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Benjamin Keith Ford <bford@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Corey Farrell <git@cfware.com> </div>
<div style="display:none"> Gerrit-Reviewer: George Joseph <gjoseph@digium.com> </div>
<div style="display:none"> Gerrit-Reviewer: Jenkins2 (1000185) </div>
<div style="display:none"> Gerrit-Reviewer: Joshua Colp <jcolp@digium.com> </div>