[Asterisk-code-review] mwi: Update the MWI core to use stasis_state API (...asterisk[master])

Kevin Harwell asteriskteam at digium.com
Fri Jul 12 09:18:15 CDT 2019


Kevin Harwell has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/11463 )

Change subject: mwi: Update the MWI core to use stasis_state API
......................................................................

mwi: Update the MWI core to use stasis_state API

** Note **

This patch is meant to be the minimum needed in order for the MWI core to use
the now underlying stasis_state module. As such it does not completely remove
its reliance on the stasis_cache. Doing so has allowed current consumers to
not have to change, and update those code paths for this patch. When time
allows, subsequent patches can/will be made to those consumers to take advantage
of some of the new MWI API included here. Thus, eventually and ultimately
removing MWI dependency on the stasis_cache.

** End Note **

This patch makes it so the MWI core now takes advantage of the new stasis_state
API. Consumers of MWI should no longer need to depend upon stasis topic pooling,
and the stasis cache directly. Similar functionality and implementation details
have now been pushed into the stasis_state module. However, all MWI state should
be accessed via the MWI API itself.

As such a few new methods, and constructs have been added to the MWI core that
facilitate consumer publishing, subscribing, and iterating over MWI state data.

* ast_mwi_subscriber *

Created via ast_mwi_add_subscriber, a subscriber subscribes to a given mailbox
in order to receive updates about the given mailbox. Adding a subscriber will
create the underlying topic, and associated state data if those do not already
exist for it. The topic, and last known state data is guaranteed to exist for
the lifetime of the subscriber.

* ast_mwi_publisher *

Before publishing to a particular topic a publisher should be created. This can
be achieved by using ast_mwi_add_publisher. Publishing to a mailbox should then
be done using one of the MWI publish functions. This ensures the message is
published to the appropriate topic, and the last known state is maintained.

* ast_mwi_observer *

Add an observer in order to watch for particular MWI module related events. For
instance if a submodule needs to know when a subscription is added to any
mailbox an observer can be added to watch for that.

* other *

Urgent message count is now part of the published MWI state object. Also state
can be iterated over using defined callbacks.

ASTERISK-28442

Change-Id: I93f935f9090cd5ddff6d4bc80ff90703c05cf776
---
M include/asterisk/mwi.h
M main/mwi.c
A tests/test_mwi.c
3 files changed, 965 insertions(+), 52 deletions(-)

Approvals:
  Kevin Harwell: Looks good to me, approved
  Friendly Automation: Approved for Submit



diff --git a/include/asterisk/mwi.h b/include/asterisk/mwi.h
index 1502224..3ce2b06 100644
--- a/include/asterisk/mwi.h
+++ b/include/asterisk/mwi.h
@@ -19,7 +19,77 @@
 #ifndef _ASTERISK_MWI_H
 #define _ASTERISK_MWI_H
 
+/*! \file
+ *
+ * \brief Asterisk MWI API.
+ *
+ * \par Intro
+ *
+ * This module manages, and processes all things MWI. Defined are mechanisms for subscribing
+ * and publishing to MWI topics. User modules wishing to receive MWI updates for a particular
+ * mailbox should do so by adding an MWI subscriber to that mailbox, followed by subscribing
+ * to the mailbox's topic. Likewise, user modules that want to publish MWI updates about a
+ * particular mailbox should first add a publisher for that mailbox prior to publishing.
+ *
+ * MWI state is managed via an underlying \ref stasis_state_manager (if interested see the
+ * stasis_state.c module for the gory details). As such all last known mailbox state can be
+ * retrieve and iterated over by using the \ref ast_mwi_callback function.
+ *
+ * \par ast_mwi_subscriber
+ *
+ * Created via \ref ast_mwi_add_subscriber, a subscriber subscribes to a given mailbox in
+ * order to receive updates about the given mailbox. Adding a subscriber will create the
+ * underlying topic, and associated state data if those do not already exist for it. The
+ * topic, and last known state data is guaranteed to exist for the lifetime of the subscriber.
+ * State data can be NULL if nothing has been published to the mailbox's topic yet.
+ *
+ * NOTE, currently adding a subscriber here will either create, or add a reference to the
+ * underlying stasis state (and associated topic). However, it does not actually subscribe to
+ * the stasis topic itself. You still need to explicitly call \ref stasis_subscribe, or
+ * similar on the topic if you wish to receive published event updates.
+ *
+ * So given that when subscribing to an MWI topic the following order should be adhered to:
+ *
+ *   1. Add an MWI state subscriber using \ref ast_mwi_add_subscriber
+ *   2. Retrieve the topic from the subscriber using \ref ast_mwi_subscriber_topic
+ *   3. Subscribe to the topic itself using \ref stasis_subscribe or \ref stasis_subscribe_pool
+ *
+ * Or simply call \ref ast_mwi_subscribe_pool, which combines those steps into a single call and
+ * returns the subscriber that is now subscribed to both the stasis topic and state.
+ *
+ * Similarly, releasing the subscriber's reference removes a reference to the underlying state,
+ * but does not unsubscribe from the MWI topic. This should be done separately and prior to
+ * removing the subscriber's state reference:
+ *
+ *   1. Unsubscribe from the stasis topic subscription using \ref stasis_unsubscribe or
+ *      \ref stasis_unsubscribe_and_join
+ *   2. Remove the MWI subscriber reference
+ *
+ * Or call \ref ast_mwi_unsubscribe (or _and_join), which combines those two steps into a
+ * single call.
+ *
+ * \par ast_mwi_publisher
+ *
+ * Before publishing to a particular topic a publisher should be created. This can be achieved
+ * by using \ref ast_mwi_add_publisher. Publishing to a mailbox should then be done using the
+ * \ref ast_mwi_publish function. This ensures the message is published to the appropriate
+ * topic, and the last known state is maintained.
+ *
+ * Publishing by mailbox id alone is also allowed. However, it is not recommended to do so,
+ * and exists mainly for backwards compatibility, and legacy subsystems. If, and when this
+ * method of publishing is employed a call to one of the \ref ast_delete_mwi_state functions
+ * should also be called for a given mailbox id after no more publishing will be done for
+ * that id. Otherwise a memory leak on the underlying stasis_state object will occur.
+ *
+ * \par ast_mwi_observer
+ *
+ * Add an observer in order to watch for particular MWI module related events. For instance if
+ * a submodule needs to know when a subscription is added to any mailbox an observer can be
+ * added to watch for that.
+ */
+
 #include "asterisk/utils.h"
+#include "asterisk/stasis_state.h"
 
 #if defined(__cplusplus) || defined(c_plusplus)
 extern "C" {
@@ -27,6 +97,273 @@
 
 struct ast_json;
 struct stasis_message_type;
+struct ast_mwi_state;
+
+/*!
+ * \brief An MWI state subscriber
+ *
+ * An ao2 managed object. Holds a reference to the latest MWI state for its lifetime.
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_subscriber;
+
+/*!
+ * \brief Add an MWI state subscriber to the mailbox
+ *
+ * Adding a subscriber to a mailbox will create a stasis topic for the mailbox if one
+ * does not already exist. It does not however subscribe to the topic itself. This is
+ * done separately using a call to \ref stasis_subscribe or \ref stasis_subscribe_pool.
+ *
+ * A subscriber can be removed by releasing its reference. Doing so releases its underlying
+ * reference to the MWI state. It does not unsubscribe from the topic. Unsubscribing from
+ * a topic should be done prior to unsubscribing the state.
+ *
+ * \param mailbox The subscription state mailbox id
+ *
+ * \retval An MWI subscriber object
+ * \retval NULL on error
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_subscriber *ast_mwi_add_subscriber(const char *mailbox);
+
+/*!
+ * \brief Add an MWI state subscriber, and stasis subscription to the mailbox
+ *
+ * Adding a subscriber to a mailbox will create a stasis topic for the mailbox if one
+ * does not already exist. Once successfully create the underlying stasis topic is then
+ * subscribed to as well.
+ *
+ * A subscriber can be removed by releasing its reference. Doing so releases its underlying
+ * reference to the MWI state. It does not unsubscribe from the topic. Unsubscribing from
+ * a topic should be done prior to unsubscribing the state.
+ *
+ * \param mailbox The subscription state mailbox id
+ * \param callback The stasis subscription callback
+ * \param data A user data object passed to the stasis subscription
+ *
+ * \retval An MWI subscriber object
+ * \retval NULL on error
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_subscriber *ast_mwi_subscribe_pool(const char *mailbox,
+	stasis_subscription_cb callback, void *data);
+
+/*!
+ * \brief Unsubscribe from the stasis topic and MWI.
+ *
+ * \param sub An MWI subscriber
+ *
+ * \retval NULL
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+void *ast_mwi_unsubscribe(struct ast_mwi_subscriber *sub);
+
+/*!
+ * \brief Unsubscribe from the stasis topic, block until the final message
+ * is received, and then unsubscribe from MWI.
+ *
+ * \param sub An MWI subscriber
+ *
+ * \retval NULL
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+void *ast_mwi_unsubscribe_and_join(struct ast_mwi_subscriber *sub);
+
+/*!
+ * \brief Retrieves the MWI subscriber's topic
+ *
+ * \note Returned topic's reference count is NOT incremented. However, the topic is
+ * guaranteed to live for the lifetime of the subscriber.
+ *
+ * \param sub An MWI subscriber
+ *
+ * \retval A stasis topic subscribed to by the subscriber
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct stasis_topic *ast_mwi_subscriber_topic(struct ast_mwi_subscriber *sub);
+
+/*!
+ * \brief Retrieves the state data object associated with the MWI subscriber
+ *
+ * \note Returned data's reference count is incremented
+ *
+ * \param sub An MWI subscriber
+ *
+ * \retval The state data object
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_state *ast_mwi_subscriber_data(struct ast_mwi_subscriber *sub);
+
+/*!
+ * \brief Retrieve the stasis MWI topic subscription if available.
+ *
+ * \param sub An MWI subscriber
+ *
+ * \retval The subscriber's stasis subscription
+ * \retval NULL if no subscription available
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct stasis_subscription *ast_mwi_subscriber_subscription(struct ast_mwi_subscriber *sub);
+
+/*!
+ * \brief An MWI state publisher
+ *
+ * An ao2 managed object. Holds a reference to the latest MWI state for its lifetime.
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_publisher;
+
+/*!
+ * \brief Add an MWI state publisher to the mailbox
+ *
+ * Adding a publisher to a mailbox will create a stasis topic for the mailbox if one
+ * does not already exist. A publisher can be removed by releasing its reference. Doing
+ * so releases its underlying reference to the MWI state.
+ *
+ * \param mailbox The mailbox id to publish to
+ *
+ * \retval An MWI publisher object
+ * \retval NULl on error
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+struct ast_mwi_publisher *ast_mwi_add_publisher(const char *mailbox);
+
+/*! \brief MWI state event interface */
+struct ast_mwi_observer {
+	/*!
+	 * \brief Raised when MWI is being subscribed
+	 *
+	 * \param mailbox The mailbox id subscribed
+	 * \param sub The subscriber subscribed
+	 */
+	void (*on_subscribe)(const char *mailbox, struct ast_mwi_subscriber *sub);
+
+	/*!
+	 * \brief Raised when MWI is being unsubscribed
+	 *
+	 * \param mailbox The mailbox id being unsubscribed
+	 * \param sub The subscriber to unsubscribe
+	 */
+	void (*on_unsubscribe)(const char *mailbox, struct ast_mwi_subscriber *sub);
+};
+
+/*!
+ * \brief Add an observer to receive MWI state related events.
+ *
+ * \param observer The observer handling events
+ *
+ * \retval 0 if successfully registered, -1 otherwise
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+int ast_mwi_add_observer(struct ast_mwi_observer *observer);
+
+/*!
+ * \brief Remove an MWI state observer.
+ *
+ * \param observer The observer being removed
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+void ast_mwi_remove_observer(struct ast_mwi_observer *observer);
+
+/*!
+ * \brief The delegate called for each managed mailbox state.
+ *
+ * \param mwi_state The mwi state object
+ * \param data User data passed in by the initiator
+ *
+ * \retval 0 to continue traversing, or CMP_STOP (2) to stop traversing
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+typedef int (*on_mwi_state)(struct ast_mwi_state *mwi_state, void *data);
+
+/*!
+ * \brief For each managed mailbox call the given handler.
+ *
+ * \param handler The mwi state handler to call for each managed mailbox
+ * \param data User to data to pass on to the handler
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+void ast_mwi_state_callback_all(on_mwi_state handler, void *data);
+
+/*!
+ * \brief For each managed mailbox that has a subscriber call the given handler.
+ *
+ * \param handler The mwi state handler to call for each managed mailbox
+ * \param data User to data to pass on to the handler
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+void ast_mwi_state_callback_subscribed(on_mwi_state handler, void *data);
+
+/*!
+ * \brief Publish MWI for the given mailbox.
+ *
+ * \param publisher The publisher to publish a mailbox update on
+ * \param urgent_msgs The number of urgent messages in this mailbox
+ * \param new_msgs The number of new messages in this mailbox
+ * \param old_msgs The number of old messages in this mailbox
+ * \param channel_id A unique identifier for a channel associated with this
+ *        change in mailbox state
+ * \param eid The EID of the server that originally published the message
+ *
+ * \retval 0 on success
+ * \retval -1 on failure
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+int ast_mwi_publish(struct ast_mwi_publisher *publisher, int urgent_msgs,
+	int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid);
+
+/*!
+ * \brief Publish MWI for the given mailbox.
+ *
+ * \param mailbox The mailbox identifier string.
+ * \param context The context this mailbox resides in (NULL or "" if only using mailbox)
+ * \param urgent_msgs The number of urgent messages in this mailbox
+ * \param new_msgs The number of new messages in this mailbox
+ * \param old_msgs The number of old messages in this mailbox
+ * \param channel_id A unique identifier for a channel associated with this
+ *        change in mailbox state
+ * \param eid The EID of the server that originally published the message
+ *
+ * \retval 0 on success
+ * \retval -1 on failure
+ *
+ * \since 13.28.0
+ * \since 16.5.0
+ */
+int ast_mwi_publish_by_mailbox(const char *mailbox, const char *context, int urgent_msgs,
+	int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid);
 
 /*!
  * \since 12
@@ -126,6 +463,7 @@
 	/*! If applicable, a snapshot of the channel that caused this MWI change */
 	struct ast_channel_snapshot *snapshot;
 	struct ast_eid eid;              /*!< The EID of the server where this message originated */
+	int urgent_msgs;                 /*!< The current number of urgent messages for this mailbox */
 };
 
 /*!
@@ -228,7 +566,7 @@
  * \retval 0 Success
  * \retval -1 Failure
  *
- * \since 13.26.0
+ * \since 13.27.0
  * \since 16.4.0
  */
 int mwi_init(void);
diff --git a/main/mwi.c b/main/mwi.c
index 43b4e04..e81766c 100644
--- a/main/mwi.c
+++ b/main/mwi.c
@@ -22,16 +22,16 @@
 
 #include "asterisk.h"
 
+#include "asterisk/app.h"
 #include "asterisk/mwi.h"
 #include "asterisk/stasis_channels.h"
 
 /*
  * @{ \brief Define \ref stasis topic objects
  */
-static struct stasis_topic *mwi_topic_all;
+static struct stasis_state_manager *mwi_state_manager;
 static struct stasis_cache *mwi_state_cache;
 static struct stasis_caching_topic *mwi_topic_cached;
-static struct stasis_topic_pool *mwi_topic_pool;
 /* @} */
 
 /*! \brief Convert a MWI \ref stasis_message to a \ref ast_event */
@@ -84,7 +84,7 @@
 
 struct stasis_topic *ast_mwi_topic_all(void)
 {
-	return mwi_topic_all;
+	return stasis_state_all_topic(mwi_state_manager);
 }
 
 struct stasis_cache *ast_mwi_state_cache(void)
@@ -99,10 +99,11 @@
 
 struct stasis_topic *ast_mwi_topic(const char *uniqueid)
 {
-	return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid);
+	return stasis_state_topic(mwi_state_manager, uniqueid);
 }
 
-struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
+static struct ast_mwi_state *mwi_create_state(const char *mailbox, const char *context,
+	int urgent_msgs, int new_msgs, int old_msgs)
 {
 	struct ast_mwi_state *mwi_state;
 
@@ -110,10 +111,14 @@
 
 	mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
 	if (!mwi_state) {
+		ast_log(LOG_ERROR, "Unable to create MWI state for mailbox '%s@%s'\n",
+				mailbox, ast_strlen_zero(context) ? "" : context);
 		return NULL;
 	}
 
 	if (ast_string_field_init(mwi_state, 256)) {
+		ast_log(LOG_ERROR, "Unable to initialize MWI state for mailbox '%s@%s'\n",
+				mailbox, ast_strlen_zero(context) ? "" : context);
 		ao2_ref(mwi_state, -1);
 		return NULL;
 	}
@@ -123,9 +128,28 @@
 		ast_string_field_set(mwi_state, uniqueid, mailbox);
 	}
 
+	mwi_state->urgent_msgs = urgent_msgs;
+	mwi_state->new_msgs = new_msgs;
+	mwi_state->old_msgs = old_msgs;
+
 	return mwi_state;
 }
 
+static struct ast_mwi_state *mwi_retrieve_then_create_state(const char *mailbox)
+{
+	int urgent_msgs;
+	int new_msgs;
+	int old_msgs;
+
+	ast_app_inboxcount2(mailbox, &urgent_msgs, &new_msgs, &old_msgs);
+	return mwi_create_state(mailbox, NULL, urgent_msgs, new_msgs, old_msgs);
+}
+
+struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
+{
+	return mwi_create_state(mailbox, context, 0, 0, 0);
+}
+
 /*!
  * \internal
  * \brief Create a MWI state snapshot message.
@@ -145,6 +169,7 @@
 static struct stasis_message *mwi_state_create_message(
 	const char *mailbox,
 	const char *context,
+	int urgent_msgs,
 	int new_msgs,
 	int old_msgs,
 	const char *channel_id,
@@ -157,14 +182,11 @@
 		return NULL;
 	}
 
-	mwi_state = ast_mwi_create(mailbox, context);
+	mwi_state = mwi_create_state(mailbox, context, urgent_msgs, new_msgs, old_msgs);
 	if (!mwi_state) {
 		return NULL;
 	}
 
-	mwi_state->new_msgs = new_msgs;
-	mwi_state->old_msgs = old_msgs;
-
 	if (!ast_strlen_zero(channel_id)) {
 		mwi_state->snapshot = ast_channel_snapshot_get_latest(channel_id);
 	}
@@ -186,6 +208,183 @@
 	return message;
 }
 
+/*!
+ * \internal
+ *
+ * This object currently acts as a typedef, but can also be thought of as a "child" object
+ * of the stasis_state_subscriber type. As such the "base" pointer should always be the
+ * first object attribute. Doing so allows this object to be easily type cast and used by
+ * the stasis_state code.
+ */
+struct ast_mwi_subscriber {
+	/*! The "base" state subscriber. (Must be first object attribute) */
+	struct stasis_state_subscriber *base;
+};
+
+struct ast_mwi_subscriber *ast_mwi_add_subscriber(const char *mailbox)
+{
+	return (struct ast_mwi_subscriber *)stasis_state_add_subscriber(
+		mwi_state_manager, mailbox);
+}
+
+struct ast_mwi_subscriber *ast_mwi_subscribe_pool(const char *mailbox,
+	stasis_subscription_cb callback, void *data)
+{
+	struct stasis_subscription *stasis_sub;
+	struct ast_mwi_subscriber *sub = (struct ast_mwi_subscriber *)stasis_state_subscribe_pool(
+		mwi_state_manager, mailbox, callback, data);
+
+	if (!sub) {
+		return NULL;
+	}
+
+	stasis_sub = ast_mwi_subscriber_subscription(sub);
+
+	stasis_subscription_accept_message_type(stasis_sub, ast_mwi_state_type());
+	stasis_subscription_set_filter(stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
+
+	return sub;
+}
+
+void *ast_mwi_unsubscribe(struct ast_mwi_subscriber *sub)
+{
+	return stasis_state_unsubscribe((struct stasis_state_subscriber *)sub);
+}
+
+void *ast_mwi_unsubscribe_and_join(struct ast_mwi_subscriber *sub)
+{
+	return stasis_state_unsubscribe_and_join((struct stasis_state_subscriber *)sub);
+}
+
+struct stasis_topic *ast_mwi_subscriber_topic(struct ast_mwi_subscriber *sub)
+{
+	return stasis_state_subscriber_topic((struct stasis_state_subscriber *)sub);
+}
+
+struct ast_mwi_state *ast_mwi_subscriber_data(struct ast_mwi_subscriber *sub)
+{
+	struct stasis_state_subscriber *s = (struct stasis_state_subscriber *)sub;
+	struct ast_mwi_state *mwi_state = stasis_state_subscriber_data(s);
+
+	return mwi_state ?: mwi_retrieve_then_create_state(stasis_state_subscriber_id(s));
+}
+
+struct stasis_subscription *ast_mwi_subscriber_subscription(struct ast_mwi_subscriber *sub)
+{
+	return stasis_state_subscriber_subscription((struct stasis_state_subscriber *)sub);
+}
+
+/*!
+ * \internal
+ *
+ * This object currently acts as a typedef, but can also be thought of as a "child" object
+ * of the stasis_state_publisher type. As such the "base" pointer should always be the
+ * first object attribute. Doing so allows this object to be easily type cast and used by
+ * the stasis_state code.
+ */
+struct ast_mwi_publisher {
+	/*! The "base" state publisher. (Must be first object attribute) */
+	struct stasis_state_publisher *base;
+};
+
+struct ast_mwi_publisher *ast_mwi_add_publisher(const char *mailbox)
+{
+	return (struct ast_mwi_publisher *)stasis_state_add_publisher(
+		mwi_state_manager, mailbox);
+}
+
+int ast_mwi_add_observer(struct ast_mwi_observer *observer)
+{
+	return stasis_state_add_observer(mwi_state_manager,
+		(struct stasis_state_observer *)observer);
+}
+
+void ast_mwi_remove_observer(struct ast_mwi_observer *observer)
+{
+	stasis_state_remove_observer(mwi_state_manager,
+		(struct stasis_state_observer *)observer);
+}
+
+struct mwi_handler_data {
+	on_mwi_state handler;
+	void *data;
+};
+
+static int handle_mwi_state(const char *id, struct stasis_message *msg, void *user_data)
+{
+	struct mwi_handler_data *d = user_data;
+	struct ast_mwi_state *mwi_state = stasis_message_data(msg);
+	int res;
+
+	if (mwi_state) {
+		return d->handler(mwi_state, d->data);
+	}
+
+	mwi_state = mwi_create_state(id, NULL, 0, 0, 0);
+	if (!mwi_state) {
+		return 0;
+	}
+
+	res = d->handler(mwi_state, d->data);
+	ao2_ref(mwi_state, -1);
+	return res;
+}
+
+void ast_mwi_state_callback_all(on_mwi_state handler, void *data)
+{
+	struct mwi_handler_data d = {
+		.handler = handler,
+		.data = data
+	};
+
+	stasis_state_callback_all(mwi_state_manager, handle_mwi_state, &d);
+}
+
+void ast_mwi_state_callback_subscribed(on_mwi_state handler, void *data)
+{
+	struct mwi_handler_data d = {
+		.handler = handler,
+		.data = data
+	};
+
+	stasis_state_callback_subscribed(mwi_state_manager, handle_mwi_state, &d);
+}
+
+int ast_mwi_publish(struct ast_mwi_publisher *pub, int urgent_msgs,
+	int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
+{
+	struct stasis_state_publisher *p = (struct stasis_state_publisher *)pub;
+	struct stasis_message *msg = mwi_state_create_message(stasis_state_publisher_id(p),
+		NULL, urgent_msgs, new_msgs, old_msgs, channel_id, eid);
+
+	if (!msg) {
+		return -1;
+	}
+
+	stasis_state_publish(p, msg);
+	ao2_ref(msg, -1);
+
+	return 0;
+}
+
+int ast_mwi_publish_by_mailbox(const char *mailbox, const char *context, int urgent_msgs,
+	int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
+{
+	struct ast_mwi_state *mwi_state;
+	struct stasis_message *msg = mwi_state_create_message(
+		mailbox, context, urgent_msgs, new_msgs, old_msgs, channel_id, eid);
+
+	if (!msg) {
+		return -1;
+	}
+
+	mwi_state = stasis_message_data(msg);
+	stasis_state_publish_by_id(mwi_state_manager, mwi_state->uniqueid, NULL, msg);
+	ao2_ref(msg, -1);
+
+	return 0;
+}
+
 int ast_publish_mwi_state_full(
 	const char *mailbox,
 	const char *context,
@@ -194,24 +393,7 @@
 	const char *channel_id,
 	struct ast_eid *eid)
 {
-	struct ast_mwi_state *mwi_state;
-	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
-	struct stasis_topic *mailbox_specific_topic;
-
-	message = mwi_state_create_message(mailbox, context, new_msgs, old_msgs, channel_id, eid);
-	if (!message) {
-		return -1;
-	}
-
-	mwi_state = stasis_message_data(message);
-	mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
-	if (!mailbox_specific_topic) {
-		return -1;
-	}
-
-	stasis_publish(mailbox_specific_topic, message);
-
-	return 0;
+	return ast_mwi_publish_by_mailbox(mailbox, context, 0, new_msgs, old_msgs, channel_id, eid);
 }
 
 int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct ast_eid *eid)
@@ -220,9 +402,8 @@
 	struct stasis_message *cached_msg;
 	struct stasis_message *clear_msg;
 	struct ast_mwi_state *mwi_state;
-	struct stasis_topic *mailbox_specific_topic;
 
-	msg = mwi_state_create_message(mailbox, context, 0, 0, NULL, eid);
+	msg = mwi_state_create_message(mailbox, context, 0, 0, 0, NULL, eid);
 	if (!msg) {
 		return -1;
 	}
@@ -244,22 +425,16 @@
 	cached_msg = stasis_cache_get_by_eid(ast_mwi_state_cache(),
 		ast_mwi_state_type(), mwi_state->uniqueid, &ast_eid_default);
 	if (!cached_msg) {
-		/* Nothing to clear */
+		/* Nothing to clear from the cache, but still need to remove state */
+		stasis_state_remove_publish_by_id(mwi_state_manager, mwi_state->uniqueid, eid, NULL);
 		return -1;
 	}
 	ao2_cleanup(cached_msg);
 
-	mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
-	if (!mailbox_specific_topic) {
-		return -1;
-	}
-
 	clear_msg = stasis_cache_clear_create(msg);
-	if (clear_msg) {
-		stasis_publish(mailbox_specific_topic, clear_msg);
-	}
-
+	stasis_state_remove_publish_by_id(mwi_state_manager, mwi_state->uniqueid, eid, clear_msg);
 	ao2_cleanup(clear_msg);
+
 	return 0;
 }
 
@@ -315,13 +490,11 @@
 
 static void mwi_cleanup(void)
 {
-	ao2_cleanup(mwi_topic_pool);
-	mwi_topic_pool = NULL;
-	ao2_cleanup(mwi_topic_all);
-	mwi_topic_all = NULL;
 	ao2_cleanup(mwi_state_cache);
 	mwi_state_cache = NULL;
 	mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
+	ao2_cleanup(mwi_state_manager);
+	mwi_state_manager = NULL;
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_state_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_vm_app_type);
 }
@@ -338,8 +511,8 @@
 		return -1;
 	}
 
-	mwi_topic_all = stasis_topic_create("mwi:all");
-	if (!mwi_topic_all) {
+	mwi_state_manager = stasis_state_manager_create("mwi:all");
+	if (!mwi_state_manager) {
 		return -1;
 	}
 
@@ -348,15 +521,10 @@
 		return -1;
 	}
 
-	mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
+	mwi_topic_cached = stasis_caching_topic_create(ast_mwi_topic_all(), mwi_state_cache);
 	if (!mwi_topic_cached) {
 		return -1;
 	}
 
-	mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all);
-	if (!mwi_topic_pool) {
-		return -1;
-	}
-
 	return 0;
 }
diff --git a/tests/test_mwi.c b/tests/test_mwi.c
new file mode 100644
index 0000000..3f633b3
--- /dev/null
+++ b/tests/test_mwi.c
@@ -0,0 +1,407 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2019, Sangoma Technologies Corporation
+ *
+ * Kevin Harwell <kharwell at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*** MODULEINFO
+	<depend>TEST_FRAMEWORK</depend>
+	<support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+#include "asterisk/astobj2.h"
+#include "asterisk/conversions.h"
+#include "asterisk/module.h"
+#include "asterisk/mwi.h"
+#include "asterisk/stasis.h"
+#include "asterisk/test.h"
+
+#define test_category "/mwi/"
+
+#define MAILBOX_PREFIX "test~" /* Hopefully sufficiently unlikely */
+#define MAILBOX_COUNT 500
+#define MAILBOX_SIZE 32
+
+AST_VECTOR(subscriptions, struct ast_mwi_subscriber *);
+AST_VECTOR(publishers, struct ast_mwi_publisher *);
+
+/*!
+ * For testing purposes each subscribed mailbox is a number. This value is
+ * the summation of all mailboxes.
+ */
+static size_t sum_total;
+
+/*! Test variable that tracks the running total of mailboxes */
+static size_t running_total;
+
+/*! This value is set to check if MWI data is zero before publishing */
+static int expect_zero;
+
+static int num_to_mailbox(char *mailbox, size_t size, size_t num)
+{
+	if (snprintf(mailbox, 10, MAILBOX_PREFIX "%zu", num) == -1) {
+		ast_log(LOG_ERROR, "Unable to convert mailbox to string\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+static int mailbox_to_num(const char *mailbox, size_t *num)
+{
+	const char *p = strchr(mailbox, '~');
+
+	if (!p) {
+		ast_log(LOG_ERROR, "Prefix separator '~' not found in '%s'\n", mailbox);
+		return -1;
+	}
+
+	if (ast_str_to_umax(++p, num)) {
+		ast_log(LOG_ERROR, "Unable to convert mailbox '%s' to numeric\n", mailbox);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int validate_data(struct ast_mwi_state *mwi_state)
+{
+	size_t num;
+	size_t val;
+
+	if (mailbox_to_num(mwi_state->uniqueid, &num)) {
+		return -1;
+	}
+
+	running_total += num;
+
+	val = expect_zero ? 0 : num;
+
+	if (mwi_state->urgent_msgs != val || mwi_state->new_msgs != val ||
+			mwi_state->old_msgs != val) {
+		ast_log(LOG_ERROR, "Unexpected MWI state data for '%s', %d != %zu\n",
+				mwi_state->uniqueid, mwi_state->urgent_msgs, val);
+		return -1;
+	}
+
+	return num;
+}
+
+static void handle_validate(const char *mailbox, struct ast_mwi_subscriber *sub)
+{
+	struct ast_mwi_state *mwi_state = ast_mwi_subscriber_data(sub);
+
+	if (ast_begins_with(mwi_state->uniqueid, MAILBOX_PREFIX)) {
+		validate_data(mwi_state);
+	}
+
+	ao2_cleanup(mwi_state);
+}
+
+struct ast_mwi_observer mwi_observer = {
+	.on_subscribe = handle_validate,
+	.on_unsubscribe = handle_validate
+};
+
+static void mwi_type_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	/* No op since we are not really testing stasis topic handling here */
+}
+
+static int subscriptions_destroy(struct subscriptions *subs)
+{
+	running_total = expect_zero = 0;
+
+	AST_VECTOR_CALLBACK_VOID(subs, ast_mwi_unsubscribe_and_join);
+	AST_VECTOR_FREE(subs);
+
+	ast_mwi_remove_observer(&mwi_observer);
+
+	if (running_total != sum_total) {
+		ast_log(LOG_ERROR, "Failed to destroy all MWI subscriptions: running=%zu, sum=%zu\n",
+				running_total, sum_total);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int subscriptions_create(struct subscriptions *subs)
+{
+	size_t i;
+
+	if (ast_mwi_add_observer(&mwi_observer) ||
+		AST_VECTOR_INIT(subs, MAILBOX_COUNT)) {
+		return -1;
+	}
+
+	sum_total = running_total = 0;
+	expect_zero = 1;
+
+	for (i = 0; i < MAILBOX_COUNT; ++i) {
+		struct ast_mwi_subscriber *sub;
+		char mailbox[MAILBOX_SIZE];
+
+		if (num_to_mailbox(mailbox, MAILBOX_SIZE, i)) {
+			break;
+		}
+
+		sub = ast_mwi_subscribe_pool(mailbox, mwi_type_cb, NULL);
+		if (!sub) {
+			ast_log(LOG_ERROR, "Failed to create a MWI subscriber for mailbox '%s'\n", mailbox);
+			break;
+		}
+
+		if (AST_VECTOR_APPEND(subs, sub)) {
+			ast_log(LOG_ERROR, "Failed to add to MWI sub to vector for mailbox '%s'\n", mailbox);
+			ao2_ref(sub, -1);
+			break;
+		}
+
+		sum_total += i;
+	}
+
+	if (i != MAILBOX_COUNT || running_total != sum_total) {
+		ast_log(LOG_ERROR, "Failed to create all MWI subscriptions: running=%zu, sum=%zu\n",
+				running_total, sum_total);
+		subscriptions_destroy(subs);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int publishers_destroy(struct publishers *pubs)
+{
+	size_t i;
+
+	if (pubs) {
+		/* Remove explicit publishers */
+		AST_VECTOR_CALLBACK_VOID(pubs, ao2_cleanup);
+		AST_VECTOR_FREE(pubs);
+		return 0;
+	}
+
+	for (i = 0; i < MAILBOX_COUNT; ++i) {
+		char mailbox[MAILBOX_SIZE];
+
+		/* Remove implicit publishers */
+		if (num_to_mailbox(mailbox, MAILBOX_SIZE, i)) {
+			return -1;
+		}
+
+		ast_delete_mwi_state(mailbox, NULL);
+	}
+
+	return 0;
+}
+
+static int publishers_create(struct publishers *pubs)
+{
+	size_t i;
+
+	if (AST_VECTOR_INIT(pubs, MAILBOX_COUNT)) {
+		return -1;
+	}
+
+	for (i = 0; i < MAILBOX_COUNT; ++i) {
+		struct ast_mwi_publisher *pub;
+		char mailbox[MAILBOX_SIZE];
+
+		if (num_to_mailbox(mailbox, MAILBOX_SIZE, i)) {
+			break;
+		}
+
+		/* Create the MWI publisher */
+		pub = ast_mwi_add_publisher(mailbox);
+		if (!pub) {
+			ast_log(LOG_ERROR, "Failed to create an MWI publisher for mailbox '%s'\n", mailbox);
+			break;
+		}
+
+		if (AST_VECTOR_APPEND(pubs, pub)) {
+			ast_log(LOG_ERROR, "Failed to add to an MWI publisher to vector for mailbox '%s'\n", mailbox);
+			ao2_ref(pub, -1);
+			break;
+		}
+	}
+
+	if (i != MAILBOX_COUNT) {
+		ast_log(LOG_ERROR, "Failed to create all MWI publishers: count=%zu\n", i);
+		publishers_destroy(pubs);
+		return -1;
+	}
+
+	return 0;
+}
+
+static int implicit_publish_cb(struct ast_mwi_state *mwi_state, void *data)
+{
+	size_t num;
+
+	if (!ast_begins_with(mwi_state->uniqueid, MAILBOX_PREFIX)) {
+		/* Ignore any mailboxes not prefixed */
+		return 0;
+	}
+
+	num = validate_data(mwi_state);
+	if (num < 0) {
+		return CMP_STOP;
+	}
+
+	ast_mwi_publish_by_mailbox(mwi_state->uniqueid, NULL, num, num, num, NULL, NULL);
+
+	return 0;
+}
+
+static int explicit_publish_cb(struct ast_mwi_state *mwi_state, void *data)
+{
+	struct publishers *pubs = data;
+	struct ast_mwi_publisher *pub;
+	size_t num;
+
+	if (!ast_begins_with(mwi_state->uniqueid, MAILBOX_PREFIX)) {
+		/* Ignore any mailboxes not prefixed */
+		return 0;
+	}
+
+	num = validate_data(mwi_state);
+	if (num < 0) {
+		return CMP_STOP;
+	}
+
+	if (mailbox_to_num(mwi_state->uniqueid, &num)) {
+		return CMP_STOP;
+	}
+
+	/* Mailbox number will always be the index */
+	pub = AST_VECTOR_GET(pubs, num);
+
+	if (!pub) {
+		ast_log(LOG_ERROR, "Unable to locate MWI publisher for mailbox '%s'\n", mwi_state->uniqueid);
+		return CMP_STOP;
+	}
+
+	ast_mwi_publish(pub, num, num, num, NULL, NULL);
+
+	return 0;
+}
+
+static int publish(on_mwi_state cb, void *user_data)
+{
+	/* First time there is no state data */
+	expect_zero = 1;
+
+	running_total = 0;
+	ast_mwi_state_callback_all(cb, user_data);
+
+	if (running_total != sum_total) {
+		ast_log(LOG_ERROR, "Failed MWI state callback (1): running=%zu, sum=%zu\n",
+				running_total, sum_total);
+		return -1;
+	}
+
+	/* Second time check valid state data exists */
+	running_total = expect_zero = 0;
+	ast_mwi_state_callback_all(cb, user_data);
+
+	if (running_total != sum_total) {
+		ast_log(LOG_ERROR, "Failed MWI state callback (2): running=%zu, sum=%zu\n",
+				running_total, sum_total);
+		return -1;
+	}
+
+	return 0;
+}
+
+AST_TEST_DEFINE(implicit_publish)
+{
+	struct subscriptions subs;
+	int rc = AST_TEST_PASS;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test implicit publishing of MWI state";
+		info->description = info->summary;
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	ast_test_validate(test, !subscriptions_create(&subs));
+
+	ast_test_validate_cleanup(test, !publish(implicit_publish_cb, NULL),
+		rc, cleanup);
+
+cleanup:
+	if (subscriptions_destroy(&subs) || publishers_destroy(NULL)) {
+		return AST_TEST_FAIL;
+	}
+
+	return rc;
+}
+
+AST_TEST_DEFINE(explicit_publish)
+{
+	struct subscriptions subs;
+	struct publishers pubs;
+	int rc = AST_TEST_PASS;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test explicit publishing of MWI state";
+		info->description = info->summary;
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	ast_test_validate(test, !subscriptions_create(&subs));
+	ast_test_validate_cleanup(test, !publishers_create(&pubs), rc, cleanup);
+
+	ast_test_validate_cleanup(test, !publish(explicit_publish_cb, &pubs),
+		rc, cleanup);
+
+cleanup:
+	if (subscriptions_destroy(&subs) || publishers_destroy(&pubs)) {
+		return AST_TEST_FAIL;
+	}
+
+	return rc;
+}
+
+static int unload_module(void)
+{
+	AST_TEST_UNREGISTER(implicit_publish);
+	AST_TEST_UNREGISTER(explicit_publish);
+
+	return 0;
+}
+
+static int load_module(void)
+{
+	AST_TEST_REGISTER(implicit_publish);
+	AST_TEST_REGISTER(explicit_publish);
+
+	return AST_MODULE_LOAD_SUCCESS;
+}
+
+AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "MWI testing");

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/11463
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Change-Id: I93f935f9090cd5ddff6d4bc80ff90703c05cf776
Gerrit-Change-Number: 11463
Gerrit-PatchSet: 6
Gerrit-Owner: Kevin Harwell <kharwell at digium.com>
Gerrit-Reviewer: Benjamin Keith Ford <bford at digium.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190712/d6af6a75/attachment-0001.html>


More information about the asterisk-code-review mailing list