[Asterisk-code-review] stasis: Add internal filtering of messages. (asterisk[master])

Joshua Colp asteriskteam at digium.com
Sun Oct 14 10:10:57 CDT 2018


Joshua Colp has uploaded this change for review. ( https://gerrit.asterisk.org/10479


Change subject: stasis: Add internal filtering of messages.
......................................................................

stasis: Add internal filtering of messages.

This change adds the ability for subscriptions to indicate
which message types they are interested in accepting. By
doing so the filtering is done before being dispatched
to the subscriber, reducing the amount of work that has
to be done.

This is optional and if a subscriber does not add
message types they wish to accept the previous behavior
is preserved and they receive all messages.

There is also the ability to explicitly force the reception
of all messages for cases such as AMI or ARI where a large
number of messages are expected that are then generically
converted into a different format.

ASTERISK-28103

Change-Id: I99bee23895baa0a117985d51683f7963b77aa190
---
M apps/app_queue.c
M channels/chan_dahdi.c
M channels/chan_iax2.c
M channels/chan_mgcp.c
M channels/chan_sip.c
M channels/chan_skinny.c
M channels/sig_pri.c
M include/asterisk/stasis.h
M include/asterisk/stasis_cache_pattern.h
M include/asterisk/stasis_message_router.h
M main/ccss.c
M main/devicestate.c
M main/endpoints.c
M main/manager.c
M main/pbx.c
M main/presencestate.c
M main/stasis.c
M main/stasis_cache.c
M main/stasis_cache_pattern.c
M main/stasis_message.c
M main/stasis_message_router.c
M res/parking/parking_applications.c
M res/parking/parking_bridge_features.c
M res/parking/parking_manager.c
M res/res_hep_rtcp.c
M res/res_pjsip_mwi.c
M res/res_pjsip_outbound_registration.c
M res/res_pjsip_publish_asterisk.c
M res/res_pjsip_refer.c
M res/res_security_log.c
M res/res_stasis_device_state.c
M res/res_xmpp.c
32 files changed, 303 insertions(+), 12 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/79/10479/1

diff --git a/apps/app_queue.c b/apps/app_queue.c
index 3dc735a..0a0d70d 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -11334,6 +11334,7 @@
 	if (!device_state_sub) {
 		err = -1;
 	}
+	stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
 
 	manager_topic = ast_manager_get_topic();
 	queue_topic = ast_queue_topic_all();
diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c
index f4f6514..4a12404 100644
--- a/channels/chan_dahdi.c
+++ b/channels/chan_dahdi.c
@@ -12594,6 +12594,9 @@
 				 * knows that we care about it.  Then, chan_dahdi will get the MWI from the
 				 * event cache instead of checking the mailbox directly. */
 				tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+				if (tmp->mwi_event_sub) {
+					stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type());
+				}
 			}
 		}
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index 01d42b5..a7c7d18 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -1456,6 +1456,9 @@
 	if (!network_change_sub) {
 		network_change_sub = stasis_subscribe(ast_system_topic(),
 			network_change_stasis_cb, NULL);
+		if (network_change_sub) {
+			stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+		}
 	}
 }
 
@@ -1469,6 +1472,9 @@
 	if (!acl_change_sub) {
 		acl_change_sub = stasis_subscribe(ast_security_topic(),
 			acl_change_stasis_cb, NULL);
+		if (acl_change_sub) {
+			stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+		}
 	}
 }
 
@@ -13072,6 +13078,9 @@
 			 * mailboxes.  However, we just grab the events out of the cache when it
 			 * is time to send MWI, since it is only sent with a REGACK. */
 			peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+			if (peer->mwi_event_sub) {
+				stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type());
+			}
 		}
 	}
 
diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c
index 2ac7690..5eee6e9 100644
--- a/channels/chan_mgcp.c
+++ b/channels/chan_mgcp.c
@@ -4242,6 +4242,9 @@
 						 * knows that we care about it.  Then, chan_mgcp will get the MWI from the
 						 * event cache instead of checking the mailbox directly. */
 						e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL);
+						if (e->mwi_event_sub) {
+							stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type());
+						}
 					}
 				}
 				snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 42362b8..d6186cc 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -17494,6 +17494,9 @@
 	if (!network_change_sub) {
 		network_change_sub = stasis_subscribe(ast_system_topic(),
 			network_change_stasis_cb, NULL);
+		if (network_change_sub) {
+			stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+		}
 	}
 }
 
@@ -17507,6 +17510,9 @@
 	if (!acl_change_sub) {
 		acl_change_sub = stasis_subscribe(ast_security_topic(),
 			acl_change_stasis_cb, NULL);
+		if (acl_change_sub) {
+			stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+		}
 	}
 
 }
@@ -28379,6 +28385,10 @@
 		mailbox_specific_topic = ast_mwi_topic(mailbox->id);
 		if (mailbox_specific_topic) {
 			mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer);
+			if (mailbox->event_sub) {
+				stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type());
+				stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type());
+			}
 		}
 	}
 }
diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c
index 2b13e5e..21671e2 100644
--- a/channels/chan_skinny.c
+++ b/channels/chan_skinny.c
@@ -8334,6 +8334,9 @@
 		mailbox_specific_topic = ast_mwi_topic(l->mailbox);
 		if (mailbox_specific_topic) {
 			l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
+			if (l->mwi_event_sub) {
+				stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type());
+			}
 		}
 	}
 
diff --git a/channels/sig_pri.c b/channels/sig_pri.c
index fbc4e40..e968602 100644
--- a/channels/sig_pri.c
+++ b/channels/sig_pri.c
@@ -9130,6 +9130,8 @@
 		if (!pri->mbox[i].sub) {
 			ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
 				sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id);
+		} else {
+			stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type());
 		}
 #if defined(HAVE_PRI_MWI_V2)
 		if (ast_strlen_zero(pri->mbox[i].vm_number)) {
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 8329dd0..650a296 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -327,6 +327,14 @@
 unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
 
 /*!
+ * \brief Gets the id of a given message type
+ * \param type The type to get the id of.
+ * \return The id
+ * \since 17.0.0
+ */
+int stasis_message_type_id(const struct stasis_message_type *type);
+
+/*!
  * \brief Check whether a message type is declined
  *
  * \param name The name of the message type to check
@@ -490,6 +498,14 @@
 const char *stasis_topic_name(const struct stasis_topic *topic);
 
 /*!
+ * \brief Return the number of subscribers of a topic.
+ * \param topic Topic.
+ * \return Number of subscribers of the topic.
+ * \since 17.0.0
+ */
+size_t stasis_topic_subscribers(const struct stasis_topic *topic);
+
+/*!
  * \brief Publish a message to a topic's subscribers.
  * \param topic Topic.
  * \param message Message to publish.
@@ -554,6 +570,10 @@
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
 	stasis_subscription_cb callback, void *data);
@@ -579,11 +599,47 @@
  * \return New \ref stasis_subscription object.
  * \return \c NULL on error.
  * \since 12.8.0
+ *
+ * \note This callback will receive a callback with a message indicating it
+ * has been subscribed. This occurs immediately before accepted message
+ * types can be set and the callback must expect to receive it.
  */
 struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
 	stasis_subscription_cb callback, void *data);
 
 /*!
+ * \brief Indicate to a subscription that we are interested in a message type.
+ *
+ * This will cause the subscription to allow the given message type to be
+ * raised to our subscription callback. This enables internal filtering in
+ * the stasis message bus to reduce messages.
+ *
+ * \param subscription Subscription to add message type to.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ *
+ * \note If you are wanting to use stasis_final_message you will need to accept
+ * \ref stasis_subscription_change_type as a message type.
+ */
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+	struct stasis_message_type *type);
+
+/*!
+ * \brief Indicate to a subscription that we are interested in ALL messages.
+ *
+ * This will cause the subscription to raise all messages to our subscription
+ * callback.
+ *
+ * \param subscription Subscription that should receive all messages.
+ *
+ * \since 17.0.0
+ */
+void stasis_subscription_accept_all(struct stasis_subscription *subscription);
+
+/*!
  * \brief Cancel a subscription.
  *
  * Note that in an asynchronous system, there may still be messages queued or
@@ -1032,6 +1088,23 @@
 	struct stasis_caching_topic *caching_topic);
 
 /*!
+ * \brief Indicate to a caching topic that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param caching_topic The caching topic.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+	struct stasis_message_type *type);
+
+/*!
  * \brief A message which instructs the caching topic to remove an entry from
  * its cache.
  *
@@ -1221,6 +1294,12 @@
 void stasis_log_bad_type_access(const char *name);
 
 /*!
+ * \brief The maximum number of Stasis message types that can be registered.
+ * \since 17.0.0
+ */
+#define STASIS_MESSAGE_TYPES_MAXIMUM 256
+
+/*!
  * \brief Boiler-plate messaging macro for defining public message types.
  *
  * \code
diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h
index e61d3e9..c0626f8 100644
--- a/include/asterisk/stasis_cache_pattern.h
+++ b/include/asterisk/stasis_cache_pattern.h
@@ -169,4 +169,21 @@
 struct stasis_topic *stasis_cp_single_topic_cached(
 	struct stasis_cp_single *one);
 
+/*!
+ * \brief Indicate to an instance that we are interested in a message type.
+ *
+ * This will cause the caching topic to receive messages of the given message
+ * type. This enables internal filtering in the stasis message bus to reduce
+ * messages.
+ *
+ * \param one One side of the cache pattern.
+ * \param type The message type we wish to receive.
+ * \retval 0 on success
+ * \retval -1 failure
+ *
+ * \since 17.0.0
+ */
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+	struct stasis_message_type *type);
+
 #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 50270a7..8dcdfcc 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -233,6 +233,10 @@
  * \retval -1 on failure
  *
  * \since 12
+ *
+ * \note Setting a default callback will automatically cause the underlying
+ * subscription to receive all messages and not be filtered. If filtering is
+ * desired then a specific route for each message type should be provided.
  */
 int stasis_message_router_set_default(struct stasis_message_router *router,
 				      stasis_subscription_cb callback,
diff --git a/main/ccss.c b/main/ccss.c
index 9cf16e3..03db6bb 100644
--- a/main/ccss.c
+++ b/main/ccss.c
@@ -1433,6 +1433,7 @@
 		cc_unref(generic_list, "Failed to subscribe to device state");
 		return NULL;
 	}
+	stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type());
 	generic_list->current_state = ast_device_state(monitor->interface->device_name);
 	ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list");
 	return generic_list;
@@ -2804,6 +2805,8 @@
 	if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
 		return -1;
 	}
+	stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type());
+	stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type());
 	cc_ref(agent, "Ref agent for subscription");
 	return 0;
 }
diff --git a/main/devicestate.c b/main/devicestate.c
index 7dcbe82..86fe372 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -920,6 +920,7 @@
 	if (!device_state_topic_cached) {
 		return -1;
 	}
+	stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type());
 
 	devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
 		devstate_change_cb, NULL);
@@ -927,6 +928,7 @@
 		ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
 		return -1;
 	}
+	stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type());
 
 	return 0;
 }
diff --git a/main/endpoints.c b/main/endpoints.c
index 992da1f..ad45053 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -200,7 +200,7 @@
 	endpoint_publish_snapshot(endpoint);
 }
 
-static void endpoint_default(void *data,
+static void endpoint_subscription_change(void *data,
 	struct stasis_subscription *sub,
 	struct stasis_message *message)
 {
@@ -261,6 +261,7 @@
 		if (!endpoint->topics) {
 			return NULL;
 		}
+		stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
 
 		endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
 		if (!endpoint->router) {
@@ -269,8 +270,9 @@
 		r |= stasis_message_router_add(endpoint->router,
 			ast_channel_snapshot_type(), endpoint_cache_clear,
 			endpoint);
-		r |= stasis_message_router_set_default(endpoint->router,
-			endpoint_default, endpoint);
+		r |= stasis_message_router_add(endpoint->router,
+			stasis_subscription_change_type(), endpoint_subscription_change,
+			endpoint);
 		if (r) {
 			return NULL;
 		}
@@ -286,6 +288,7 @@
 		if (!endpoint->topics) {
 			return NULL;
 		}
+		stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
 
 		ao2_link(tech_endpoints, endpoint);
 	}
diff --git a/main/manager.c b/main/manager.c
index 9d67e0c..6e6d9d8 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1527,6 +1527,9 @@
 	if (!acl_change_sub) {
 		acl_change_sub = stasis_subscribe(ast_security_topic(),
 			acl_change_stasis_cb, NULL);
+		if (acl_change_sub) {
+			stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type());
+		}
 	}
 }
 
diff --git a/main/pbx.c b/main/pbx.c
index 727018b..9718133 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -8416,10 +8416,13 @@
 	if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
 		return -1;
 	}
+	stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type());
+	stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type());
 
 	if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) {
 		return -1;
 	}
+	stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type());
 
 	return 0;
 }
diff --git a/main/presencestate.c b/main/presencestate.c
index 4121bf5..e287748 100644
--- a/main/presencestate.c
+++ b/main/presencestate.c
@@ -514,6 +514,7 @@
 	if (!presence_state_topic_cached) {
 		return -1;
 	}
+	stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type());
 
 	AST_TEST_REGISTER(test_presence_chan);
 
diff --git a/main/stasis.c b/main/stasis.c
index 51f01c0..8815ef3 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -370,6 +370,18 @@
 	return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+	return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
+/*! \internal */
+enum stasis_all_messages {
+	STASIS_ALL_MESSAGES_DISABLED = 0, /*! The subscription is selective on message types */
+	STASIS_ALL_MESSAGES_DEFAULT,      /*! The subscription is by default accepting all */
+	STASIS_ALL_MESSAGES_FORCED,       /*! The subscription has forced all messages */
+};
+
 /*! \internal */
 struct stasis_subscription {
 	/*! Unique ID for this subscription */
@@ -391,6 +403,11 @@
 	/*! Flag set when final message for sub has been processed.
 	 *  Be sure join_lock is held before reading/setting. */
 	int final_message_processed;
+
+	/*! The message types this subscription is accepting */
+	char accepted_message_types[STASIS_MESSAGE_TYPES_MAXIMUM];
+	/*! Whether all messages are being accepted by the subscription */
+	enum stasis_all_messages all_messages;
 };
 
 static void subscription_dtor(void *obj)
@@ -420,19 +437,24 @@
 static void subscription_invoke(struct stasis_subscription *sub,
 				  struct stasis_message *message)
 {
+	unsigned int final = stasis_subscription_final_message(sub, message);
+	int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
 	/* Notify that the final message has been received */
-	if (stasis_subscription_final_message(sub, message)) {
+	if (final) {
 		ao2_lock(sub);
 		sub->final_message_rxed = 1;
 		ast_cond_signal(&sub->join_cond);
 		ao2_unlock(sub);
 	}
 
-	/* Since sub is mostly immutable, no need to lock sub */
-	sub->callback(sub->data, sub, message);
+	if (!final || sub->all_messages != STASIS_ALL_MESSAGES_DISABLED || sub->accepted_message_types[message_type_id]) {
+		/* Since sub is mostly immutable, no need to lock sub */
+		sub->callback(sub->data, sub, message);
+	}
 
 	/* Notify that the final message has been processed */
-	if (stasis_subscription_final_message(sub, message)) {
+	if (final) {
 		ao2_lock(sub);
 		sub->final_message_processed = 1;
 		ast_cond_signal(&sub->join_cond);
@@ -500,6 +522,7 @@
 	sub->callback = callback;
 	sub->data = data;
 	ast_cond_init(&sub->join_cond, NULL);
+	sub->all_messages = STASIS_ALL_MESSAGES_DEFAULT;
 
 	if (topic_add_subscription(topic, sub) != 0) {
 		ao2_ref(sub, -1);
@@ -583,6 +606,38 @@
 	return res;
 }
 
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+	struct stasis_message_type *type)
+{
+	if (!subscription) {
+		return -1;
+	}
+
+	ao2_lock(subscription->topic);
+	subscription->accepted_message_types[stasis_message_type_id(type)] = 1;
+
+	/* If the subscription is still in the default accept all switch it to
+	 * selective.
+	 */
+	if (subscription->all_messages == STASIS_ALL_MESSAGES_DEFAULT) {
+		subscription->all_messages = STASIS_ALL_MESSAGES_DISABLED;
+	}
+	ao2_unlock(subscription->topic);
+
+	return 0;
+}
+
+void stasis_subscription_accept_all(struct stasis_subscription *subscription)
+{
+	if (!subscription) {
+		return;
+	}
+
+	ao2_lock(subscription->topic);
+	subscription->all_messages = STASIS_ALL_MESSAGES_FORCED;
+	ao2_unlock(subscription->topic);
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
 	if (subscription) {
@@ -778,6 +833,15 @@
 	struct stasis_message *message,
 	int synchronous)
 {
+	/* Determine if this subscription is interested in this message. Note that final
+	 * messages are special and are always invoked on the subscription.
+	 */
+	if (sub->all_messages == STASIS_ALL_MESSAGES_DISABLED &&
+		!sub->accepted_message_types[stasis_message_type_id(stasis_message_type(message))] &&
+		!stasis_subscription_final_message(sub, message)) {
+		return;
+	}
+
 	if (!sub->mailbox) {
 		/* Dispatch directly */
 		subscription_invoke(sub, message);
@@ -837,6 +901,11 @@
 	ast_assert(topic != NULL);
 	ast_assert(message != NULL);
 
+	/* If there are no subscribers don't bother */
+	if (!stasis_topic_subscribers(topic)) {
+		return;
+	}
+
 	/*
 	 * The topic may be unref'ed by the subscription invocation.
 	 * Make sure we hold onto a reference while dispatching.
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 3d353b3..d03758d 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -87,6 +87,20 @@
 	return caching_topic->topic;
 }
 
+int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
+	struct stasis_message_type *type)
+{
+	/* We wait to accept the stasis specific message types until now so that by default everything
+	 * will flow to us.
+	 */
+	if (stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type()) ||
+		stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type()) ||
+		stasis_subscription_accept_message_type(caching_topic->sub, type)) {
+		return -1;
+	}
+	return 0;
+}
+
 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
 {
 	if (!caching_topic) {
@@ -856,11 +870,13 @@
 		/* Update the cache */
 		snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
 		if (snapshots.old || msg_put) {
-			update = update_create(snapshots.old, msg_put);
-			if (update) {
-				stasis_publish(caching_topic->topic, update);
+			if (stasis_topic_subscribers(caching_topic->topic)) {
+				update = update_create(snapshots.old, msg_put);
+				if (update) {
+					stasis_publish(caching_topic->topic, update);
+					ao2_ref(update, -1);
+				}
 			}
-			ao2_cleanup(update);
 		} else {
 			ast_debug(1,
 				"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
@@ -868,7 +884,7 @@
 				stasis_message_type_name(msg_type), msg_id);
 		}
 
-		if (snapshots.aggregate_old != snapshots.aggregate_new) {
+		if (stasis_topic_subscribers(caching_topic->topic) && snapshots.aggregate_old != snapshots.aggregate_new) {
 			if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
 				caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
 					snapshots.aggregate_new);
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index f0e34b9..05d95a5 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -217,3 +217,12 @@
 	}
 	return stasis_caching_get_topic(one->topic_cached);
 }
+
+int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
+	struct stasis_message_type *type)
+{
+	if (!one) {
+		return NULL;
+	}
+	return stasis_caching_accept_message_type(one->topic_cached, type);
+}
diff --git a/main/stasis_message.c b/main/stasis_message.c
index 19f4a92..50dbae1 100644
--- a/main/stasis_message.c
+++ b/main/stasis_message.c
@@ -39,9 +39,11 @@
 	struct stasis_message_vtable *vtable;
 	char *name;
 	unsigned int hash;
+	int id;
 };
 
 static struct stasis_message_vtable null_vtable = {};
+static int message_type_id;
 
 static void message_type_dtor(void *obj)
 {
@@ -61,6 +63,12 @@
 		return STASIS_MESSAGE_TYPE_DECLINED;
 	}
 
+	/* If there is no more space for message type registration then decline this type */
+	if (message_type_id == STASIS_MESSAGE_TYPES_MAXIMUM) {
+		ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to maximum number of registered message types being reached\n", name);
+		return STASIS_MESSAGE_TYPE_DECLINED;
+	}
+
 	type = ao2_t_alloc_options(sizeof(*type), message_type_dtor,
 		AO2_ALLOC_OPT_LOCK_NOLOCK, name ?: "");
 	if (!type) {
@@ -78,6 +86,7 @@
 	}
 	type->hash = ast_hashtab_hash_string(name);
 	type->vtable = vtable;
+	type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
 	*result = type;
 
 	return STASIS_MESSAGE_TYPE_SUCCESS;
@@ -93,6 +102,11 @@
 	return type->hash;
 }
 
+int stasis_message_type_id(const struct stasis_message_type *type)
+{
+	return type->id;
+}
+
 /*! \internal */
 struct stasis_message {
 	/*! Time the message was created */
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 41d426b..7e04d46 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -235,6 +235,9 @@
 		return NULL;
 	}
 
+	/* We need to receive subscription change messages so we know when our subscription goes away */
+	stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
+
 	return router;
 }
 
@@ -316,6 +319,9 @@
 	}
 	ao2_lock(router);
 	res = route_table_add(&router->routes, message_type, callback, data);
+	if (!res) {
+		stasis_subscription_accept_message_type(router->subscription, message_type);
+	}
 	ao2_unlock(router);
 	return res;
 }
@@ -334,6 +340,9 @@
 	}
 	ao2_lock(router);
 	res = route_table_add(&router->cache_routes, message_type, callback, data);
+	if (!res) {
+		stasis_subscription_accept_message_type(router->subscription, message_type);
+	}
 	ao2_unlock(router);
 	return res;
 }
@@ -378,6 +387,9 @@
 	router->default_route.callback = callback;
 	router->default_route.data = data;
 	ao2_unlock(router);
+
+	stasis_subscription_accept_all(router->subscription);
+
 	/* While this implementation can never fail, it used to be able to */
 	return 0;
 }
diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c
index dd2fb75..3ecfc4e 100644
--- a/res/parking/parking_applications.c
+++ b/res/parking/parking_applications.c
@@ -954,6 +954,9 @@
 		return -1;
 	}
 
+	stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type());
+	stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type());
+
 	/* Now for the fun part... park it! */
 	ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0);
 
diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c
index cbc23fa..d8252a7 100644
--- a/res/parking/parking_bridge_features.c
+++ b/res/parking/parking_bridge_features.c
@@ -213,6 +213,7 @@
 	if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
 		return -1;
 	}
+	stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type());
 
 	datastore->data = parked_datastore;
 
diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c
index 6d0a4c0..764f39b 100644
--- a/res/parking/parking_manager.c
+++ b/res/parking/parking_manager.c
@@ -686,6 +686,9 @@
 {
 	if (!parking_sub) {
 		parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL);
+		if (parking_sub) {
+			stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type());
+		}
 	}
 }
 
diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c
index c3abbc1..85ad9b5 100644
--- a/res/res_hep_rtcp.c
+++ b/res/res_hep_rtcp.c
@@ -167,6 +167,8 @@
 	if (!stasis_rtp_subscription) {
 		return AST_MODULE_LOAD_DECLINE;
 	}
+	stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type());
+	stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type());
 
 	return AST_MODULE_LOAD_SUCCESS;
 }
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index 4cd892c..a434f48 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -269,6 +269,8 @@
 		ao2_ref(mwi_sub, -1);
 		mwi_stasis_sub = NULL;
 	}
+	stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type());
+	stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type());
 	return mwi_stasis_sub;
 }
 
diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c
index 0d815ad..b38096c 100644
--- a/res/res_pjsip_outbound_registration.c
+++ b/res/res_pjsip_outbound_registration.c
@@ -2282,6 +2282,9 @@
 
 	network_change_sub = stasis_subscribe(ast_system_topic(),
 		network_change_stasis_cb, NULL);
+	if (network_change_sub) {
+		stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type());
+	}
 
 	return AST_MODULE_LOAD_SUCCESS;
 }
diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c
index 220ba0b..841b827 100644
--- a/res/res_pjsip_publish_asterisk.c
+++ b/res/res_pjsip_publish_asterisk.c
@@ -360,6 +360,8 @@
 		ao2_ref(datastore, -1);
 		return -1;
 	}
+	stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type());
+	stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type());
 
 	cached = stasis_cache_dump(ast_device_state_cache(), NULL);
 	ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore);
@@ -435,6 +437,8 @@
 		ao2_ref(datastore, -1);
 		return -1;
 	}
+	stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type());
+	stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type());
 
 	cached = stasis_cache_dump(ast_mwi_state_cache(), NULL);
 	ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore);
diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c
index 1e6ca7f..5e9666a 100644
--- a/res/res_pjsip_refer.c
+++ b/res/res_pjsip_refer.c
@@ -686,6 +686,8 @@
 			ast_channel_unlock(chan);
 
 			ao2_cleanup(refer->progress);
+		} else {
+			stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type());
 		}
 	}
 
diff --git a/res/res_security_log.c b/res/res_security_log.c
index 555ba23..a50a8c8 100644
--- a/res/res_security_log.c
+++ b/res/res_security_log.c
@@ -141,6 +141,7 @@
 		LOG_SECURITY = -1;
 		return AST_MODULE_LOAD_DECLINE;
 	}
+	stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type());
 
 	ast_verb(3, "Security Logging Enabled\n");
 
diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c
index be09b15..bbe3f27 100644
--- a/res/res_stasis_device_state.c
+++ b/res/res_stasis_device_state.c
@@ -394,6 +394,8 @@
 		ao2_ref(sub, -1);
 		return -1;
 	}
+	stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
+	stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
 
 	ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
 	ao2_unlock(device_state_subscriptions);
diff --git a/res/res_xmpp.c b/res/res_xmpp.c
index b72581f..e51d7d2 100644
--- a/res/res_xmpp.c
+++ b/res/res_xmpp.c
@@ -1626,11 +1626,13 @@
 	if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
 		return;
 	}
+	stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type());
 
 	if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
 		client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
 		return;
 	}
+	stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type());
 
 	cached = stasis_cache_dump(ast_device_state_cache(), NULL);
 	ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);

-- 
To view, visit https://gerrit.asterisk.org/10479
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I99bee23895baa0a117985d51683f7963b77aa190
Gerrit-Change-Number: 10479
Gerrit-PatchSet: 1
Gerrit-Owner: Joshua Colp <jcolp at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181014/561a4ad5/attachment-0001.html>


More information about the asterisk-code-review mailing list