[asterisk-commits] dlee: trunk r389011 - in /trunk: apps/ channels/ funcs/ include/asterisk/ mai...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri May 17 16:10:45 CDT 2013


Author: dlee
Date: Fri May 17 16:10:32 2013
New Revision: 389011

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=389011
Log:
Fix shutdown assertions in stasis-core

In r388005, macros were introduced to consistently define message
types. This added an assert if a message type was used either before
it was initialized or after it had been cleaned up. It turns out that
this assertion fires during shutdown.

This actually exposed a hidden shutdown ordering problem. Since
unsubscribing is asynchronous, it's possible that the message types
used by the subscription could be freed before the final message of
the subscription was processed.

This patch adds stasis_subscription_join(), which blocks until the
last message has been processed by the subscription. Since joining was
most commonly done right after an unsubscribe, a
stasis_unsubscribe_and_join() convenience function was also added.

Similar functions were also added to the stasis_caching_topic and
stasis_message_router, since they wrap subscriptions and have similar
problems.

Other code in trunk was refactored to join() where appropriate, or at
least verify that the subscription was complete before being
destroyed.

Review: https://reviewboard.asterisk.org/r/2540

Modified:
    trunk/apps/app_queue.c
    trunk/apps/app_voicemail.c
    trunk/channels/chan_iax2.c
    trunk/channels/chan_sip.c
    trunk/funcs/func_presencestate.c
    trunk/include/asterisk/stasis.h
    trunk/include/asterisk/stasis_message_router.h
    trunk/main/app.c
    trunk/main/devicestate.c
    trunk/main/endpoints.c
    trunk/main/manager.c
    trunk/main/manager_channels.c
    trunk/main/pbx.c
    trunk/main/stasis.c
    trunk/main/stasis_cache.c
    trunk/main/stasis_channels.c
    trunk/main/stasis_endpoints.c
    trunk/main/stasis_message_router.c
    trunk/res/res_chan_stats.c
    trunk/res/res_jabber.c
    trunk/res/res_stasis.c

Modified: trunk/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_queue.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/apps/app_queue.c (original)
+++ trunk/apps/app_queue.c Fri May 17 16:10:32 2013
@@ -9866,9 +9866,7 @@
 
 	res |= ast_data_unregister(NULL);
 
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
 	ast_extension_state_del(0, extension_state_cb);
 

Modified: trunk/apps/app_voicemail.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_voicemail.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/apps/app_voicemail.c (original)
+++ trunk/apps/app_voicemail.c Fri May 17 16:10:32 2013
@@ -12689,9 +12689,7 @@
 {
 	poll_thread_run = 0;
 
-	if (mwi_sub_sub) {
-		mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
-	}
+	mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
 
 	ast_mutex_lock(&poll_lock);
 	ast_cond_signal(&poll_cond);

Modified: trunk/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_iax2.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/channels/chan_iax2.c (original)
+++ trunk/channels/chan_iax2.c Fri May 17 16:10:32 2013
@@ -1334,9 +1334,7 @@
 
 static void network_change_stasis_unsubscribe(void)
 {
-	if (network_change_sub) {
-		network_change_sub = stasis_unsubscribe(network_change_sub);
-	}
+	network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
@@ -1349,9 +1347,7 @@
 
 static void acl_change_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)
@@ -12424,9 +12420,7 @@
 	if (peer->dnsmgr)
 		ast_dnsmgr_release(peer->dnsmgr);
 
-	if (peer->mwi_event_sub) {
-		peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
-	}
+	peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
 
 	ast_string_field_free_memory(peer);
 }

Modified: trunk/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_sip.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/channels/chan_sip.c (original)
+++ trunk/channels/chan_sip.c Fri May 17 16:10:32 2013
@@ -16742,25 +16742,21 @@
 
 static void network_change_stasis_unsubscribe(void)
 {
-	if (network_change_sub) {
-		network_change_sub = stasis_unsubscribe(network_change_sub);
-	}
+	network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
 {
 	if (!acl_change_sub) {
 		acl_change_sub = stasis_subscribe(ast_security_topic(),
-		acl_change_stasis_cb, NULL);
+			acl_change_stasis_cb, NULL);
 	}
 
 }
 
 static void acl_change_event_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)

Modified: trunk/funcs/func_presencestate.c
URL: http://svnview.digium.com/svn/asterisk/trunk/funcs/func_presencestate.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/funcs/func_presencestate.c (original)
+++ trunk/funcs/func_presencestate.c Fri May 17 16:10:32 2013
@@ -706,7 +706,7 @@
 		return AST_TEST_FAIL;
 	}
 
-	test_sub = stasis_unsubscribe(test_sub);
+	test_sub = stasis_unsubscribe_and_join(test_sub);
 
 	ao2_cleanup(cb_data->presence_state);
 	ast_free((char *)cb_data);

Modified: trunk/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis.h?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/include/asterisk/stasis.h (original)
+++ trunk/include/asterisk/stasis.h Fri May 17 16:10:32 2013
@@ -124,6 +124,34 @@
  * stasis_subscription. Due to cyclic references, the \ref
  * stasis_subscription will not be freed until after it has been unsubscribed,
  * and all other ao2_ref()'s have been cleaned up.
+ *
+ * \par Shutdown
+ *
+ * Subscriptions have two options for unsubscribing, depending upon the context
+ * in which you need to unsubscribe.
+ *
+ * If your subscription is owned by a module, and you must unsubscribe from the
+ * module_unload() function, then you'll want to use the
+ * stasis_unsubscribe_and_join() function. This will block until the final
+ * message has been received on the subscription. Otherwise, there's the danger
+ * of invoking the callback function after it has been unloaded.
+ *
+ * If your subscription is owned by an object, then your object should have an
+ * explicit shutdown() function, which calls stasis_unsubscribe(). In your
+ * subscription handler, when the stasis_subscription_final_message() has been
+ * received, decrement the refcount on your object. In your object's destructor,
+ * you may assert that stasis_subscription_is_done() to validate that the
+ * subscription's callback will no longer be invoked.
+ *
+ * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
+ * an object's destructor. While code that does this may work most of the time,
+ * it's got one big downside. There's a general assumption that object
+ * destruction is non-blocking. If you block the destruction waiting for the
+ * subscription to complete, there's the danger that the subscription may
+ * process a message which will bump the refcount up by one. Then it does
+ * whatever it does, decrements the refcount, which then proceeds to re-destroy
+ * the object. Now you've got hard to reproduce bugs that only show up under
+ * certain loads.
  */
 
 #include "asterisk/utils.h"
@@ -292,8 +320,7 @@
  * \since 12
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
-					     stasis_subscription_cb callback,
-					     void *data);
+	stasis_subscription_cb callback, void *data);
 
 /*!
  * \brief Cancel a subscription.
@@ -304,10 +331,52 @@
  * delivery of the final message.
  *
  * \param subscription Subscription to cancel.
- * \retval NULL for convenience
- * \since 12
- */
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+	struct stasis_subscription *subscription);
+
+/*!
+ * \brief Block until the last message is processed on a subscription.
+ *
+ * This function will not return until the \a subscription's callback for the
+ * stasis_subscription_final_message() completes. This allows cleanup routines
+ * to run before unblocking the joining thread.
+ *
+ * \param subscription Subscription to block on.
+ * \since 12
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Returns whether \a subscription has received its final message.
+ *
+ * Note that a subscription is considered done even while the
+ * stasis_subscription_final_message() is being processed. This allows cleanup
+ * routines to check the status of the subscription.
+ *
+ * \param subscription Subscription.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ *         received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_subscription_is_done(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait for
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe_and_join(
+	struct stasis_subscription *subscription);
 
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
@@ -322,7 +391,8 @@
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+	struct stasis_topic *to_topic);
 
 /*!
  * \brief Get the unique ID for the subscription.
@@ -389,7 +459,8 @@
 /*!
  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
  * \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
  */
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
 
@@ -397,8 +468,8 @@
  * \brief Find or create a topic in the pool
  * \param pool Pool for which to get the topic
  * \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
  */
 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
 
@@ -496,12 +567,31 @@
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
 
 /*!
- * Unsubscribes a caching topic from its upstream topic.
+ * \brief Unsubscribes a caching topic from its upstream topic.
+ *
+ * This function returns immediately, so be sure to cleanup when
+ * stasis_subscription_final_message() is received.
+ *
  * \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
- * \since 12
- */
-struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe(
+	struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Unsubscribes a caching topic from its upstream topic, blocking until
+ * all messages have been forwarded.
+ *
+ * See stasis_unsubscriben_and_join() for more info on when to use this as
+ * opposed to stasis_caching_unsubscribe().
+ *
+ * \param caching_topic Caching topic to unsubscribe
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
+	struct stasis_caching_topic *caching_topic);
 
 /*!
  * \brief Returns the topic of cached events from a caching topics.
@@ -530,9 +620,9 @@
 /*!
  * \brief Dump cached items to a subscription
  * \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
  * \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
  * \since 12
  */
 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,

Modified: trunk/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_message_router.h?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/include/asterisk/stasis_message_router.h (original)
+++ trunk/include/asterisk/stasis_message_router.h Fri May 17 16:10:32 2013
@@ -57,10 +57,34 @@
 
 /*!
  * \brief Unsubscribe the router from the upstream topic.
+ *
  * \param router Router to unsubscribe.
  * \since 12
  */
 void stasis_message_router_unsubscribe(struct stasis_message_router *router);
+
+/*!
+ * \brief Unsubscribe the router from the upstream topic, blocking until the
+ * final message has been processed.
+ *
+ * See stasis_unsubscribe_and_join() for info on when to use this
+ * vs. stasis_message_router_unsubscribe().
+ *
+ * \param router Router to unsubscribe.
+ * \since 12
+ */
+void stasis_message_router_unsubscribe_and_join(
+	struct stasis_message_router *router);
+
+/*!
+ * \brief Returns whether \a router has received its final message.
+ *
+ * \param router Router.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ *         received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_message_router_is_done(struct stasis_message_router *router);
 
 /*!
  * \brief Add a route to a message router.

Modified: trunk/main/app.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/app.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/app.c (original)
+++ trunk/main/app.c Fri May 17 16:10:32 2013
@@ -2727,7 +2727,7 @@
 {
 	ao2_cleanup(mwi_topic_all);
 	mwi_topic_all = NULL;
-	mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+	mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
 	ao2_cleanup(mwi_topic_pool);
 	mwi_topic_pool = NULL;

Modified: trunk/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/devicestate.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/devicestate.c (original)
+++ trunk/main/devicestate.c Fri May 17 16:10:32 2013
@@ -776,7 +776,7 @@
 {
 	ao2_cleanup(device_state_topic_all);
 	device_state_topic_all = NULL;
-	device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+	device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
 	ao2_cleanup(device_state_topic_pool);
 	device_state_topic_pool = NULL;

Modified: trunk/main/endpoints.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/endpoints.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/endpoints.c (original)
+++ trunk/main/endpoints.c Fri May 17 16:10:32 2013
@@ -106,7 +106,9 @@
 	struct ast_endpoint *endpoint = obj;
 
 	/* The router should be shut down already */
-	ast_assert(endpoint->router == NULL);
+	ast_assert(stasis_message_router_is_done(endpoint->router));
+	ao2_cleanup(endpoint->router);
+	endpoint->router = NULL;
 
 	stasis_unsubscribe(endpoint->forward);
 	endpoint->forward = NULL;
@@ -258,8 +260,9 @@
 		stasis_publish(endpoint->topic, message);
 	}
 
+	/* Bump refcount to hold on to the router */
+	ao2_ref(endpoint->router, +1);
 	stasis_message_router_unsubscribe(endpoint->router);
-	endpoint->router = NULL;
 }
 
 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)

Modified: trunk/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/manager.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/manager.c (original)
+++ trunk/main/manager.c Fri May 17 16:10:32 2013
@@ -1077,9 +1077,7 @@
 
 static void acl_change_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 /* In order to understand what the heck is going on with the

Modified: trunk/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/manager_channels.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/manager_channels.c (original)
+++ trunk/main/manager_channels.c Fri May 17 16:10:32 2013
@@ -805,7 +805,7 @@
 
 static void manager_channels_shutdown(void)
 {
-	stasis_message_router_unsubscribe(channel_state_router);
+	stasis_message_router_unsubscribe_and_join(channel_state_router);
 	channel_state_router = NULL;
 }
 

Modified: trunk/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/pbx.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/pbx.c (original)
+++ trunk/main/pbx.c Fri May 17 16:10:32 2013
@@ -11700,12 +11700,8 @@
 {
 	int x;
 
-	if (presence_state_sub) {
-		presence_state_sub = stasis_unsubscribe(presence_state_sub);
-	}
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
 	/* Unregister builtin applications */
 	for (x = 0; x < ARRAY_LEN(builtins); x++) {

Modified: trunk/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/stasis.c (original)
+++ trunk/main/stasis.c Fri May 17 16:10:32 2013
@@ -114,16 +114,30 @@
 	stasis_subscription_cb callback;
 	/*! Data pointer to be handed to the callback. */
 	void *data;
+
+	/*! Lock for joining with subscription. */
+	ast_mutex_t join_lock;
+	/*! Condition for joining with subscription. */
+	ast_cond_t join_cond;
+	/*! Flag set when final message for sub has been received.
+	 *  Be sure join_lock is held before reading/setting. */
+	int final_message_rxed;
+	/*! Flag set when final message for sub has been processed.
+	 *  Be sure join_lock is held before reading/setting. */
+	int final_message_processed;
 };
 
 static void subscription_dtor(void *obj)
 {
 	struct stasis_subscription *sub = obj;
 	ast_assert(!stasis_subscription_is_subscribed(sub));
+	ast_assert(stasis_subscription_is_done(sub));
 	ao2_cleanup(sub->topic);
 	sub->topic = NULL;
 	ast_taskprocessor_unreference(sub->mailbox);
 	sub->mailbox = NULL;
+	ast_mutex_destroy(&sub->join_lock);
+	ast_cond_destroy(&sub->join_cond);
 }
 
 /*!
@@ -136,11 +150,22 @@
 				  struct stasis_topic *topic,
 				  struct stasis_message *message)
 {
-	/* Since sub->topic doesn't change, no need to lock sub */
-	sub->callback(sub->data,
-		      sub,
-		      topic,
-		      message);
+	/* Notify that the final message has been received */
+	if (stasis_subscription_final_message(sub, message)) {
+		SCOPED_MUTEX(lock, &sub->join_lock);
+		sub->final_message_rxed = 1;
+		ast_cond_signal(&sub->join_cond);
+	}
+
+	/* Since sub is mostly immutable, no need to lock sub */
+	sub->callback(sub->data, sub, topic, message);
+
+	/* Notify that the final message has been processed */
+	if (stasis_subscription_final_message(sub, message)) {
+		SCOPED_MUTEX(lock, &sub->join_lock);
+		sub->final_message_processed = 1;
+		ast_cond_signal(&sub->join_cond);
+	}
 }
 
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@@ -171,6 +196,8 @@
 	sub->topic = topic;
 	sub->callback = callback;
 	sub->data = data;
+	ast_mutex_init(&sub->join_lock);
+	ast_cond_init(&sub->join_cond, NULL);
 
 	if (topic_add_subscription(topic, sub) != 0) {
 		return NULL;
@@ -209,6 +236,50 @@
 
 		ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
 	}
+	return NULL;
+}
+
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+	if (subscription) {
+		SCOPED_MUTEX(lock, &subscription->join_lock);
+		/* Wait until the processed flag has been set */
+		while (!subscription->final_message_processed) {
+			ast_cond_wait(&subscription->join_cond,
+				&subscription->join_lock);
+		}
+	}
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+	if (subscription) {
+		SCOPED_MUTEX(lock, &subscription->join_lock);
+		return subscription->final_message_rxed;
+	}
+
+	/* Null subscription is about as done as you can get */
+	return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+	struct stasis_subscription *subscription)
+{
+	if (!subscription) {
+		return NULL;
+	}
+
+	/* Bump refcount to hold it past the unsubscribe */
+	ao2_ref(subscription, +1);
+	stasis_unsubscribe(subscription);
+	stasis_subscription_join(subscription);
+	/* Now decrement the refcount back */
+	ao2_cleanup(subscription);
 	return NULL;
 }
 

Modified: trunk/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_cache.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/stasis_cache.c (original)
+++ trunk/main/stasis_cache.c Fri May 17 16:10:32 2013
@@ -53,6 +53,8 @@
 static void stasis_caching_topic_dtor(void *obj) {
 	struct stasis_caching_topic *caching_topic = obj;
 	ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+	ast_assert(stasis_subscription_is_done(caching_topic->sub));
+	ao2_cleanup(caching_topic->sub);
 	caching_topic->sub = NULL;
 	ao2_cleanup(caching_topic->cache);
 	caching_topic->cache = NULL;
@@ -69,11 +71,28 @@
 {
 	if (caching_topic) {
 		if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+			/* Increment the reference to hold on to it past the
+			 * unsubscribe */
+			ao2_ref(caching_topic->sub, +1);
 			stasis_unsubscribe(caching_topic->sub);
 		} else {
 			ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
 		}
 	}
+	return NULL;
+}
+
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
+{
+	if (!caching_topic) {
+		return NULL;
+	}
+
+	/* Hold a ref past the unsubscribe */
+	ao2_ref(caching_topic, +1);
+	stasis_caching_unsubscribe(caching_topic);
+	stasis_subscription_join(caching_topic->sub);
+	ao2_cleanup(caching_topic);
 	return NULL;
 }
 

Modified: trunk/main/stasis_channels.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_channels.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/stasis_channels.c (original)
+++ trunk/main/stasis_channels.c Fri May 17 16:10:32 2013
@@ -505,6 +505,9 @@
 
 void ast_stasis_channels_shutdown(void)
 {
+	channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
+	ao2_cleanup(channel_topic_all);
+	channel_topic_all = NULL;
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@@ -512,9 +515,6 @@
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
-	ao2_cleanup(channel_topic_all);
-	channel_topic_all = NULL;
-	channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
 }
 
 void ast_stasis_channels_init(void)

Modified: trunk/main/stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_endpoints.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/stasis_endpoints.c (original)
+++ trunk/main/stasis_endpoints.c Fri May 17 16:10:32 2013
@@ -101,15 +101,6 @@
 }
 
 
-static void endpoints_stasis_shutdown(void)
-{
-	ao2_cleanup(endpoint_topic_all);
-	endpoint_topic_all = NULL;
-
-	stasis_caching_unsubscribe(endpoint_topic_all_cached);
-	endpoint_topic_all_cached = NULL;
-}
-
 struct ast_json *ast_endpoint_snapshot_to_json(
 	const struct ast_endpoint_snapshot *snapshot)
 {
@@ -149,6 +140,15 @@
 	return ast_json_ref(json);
 }
 
+static void endpoints_stasis_shutdown(void)
+{
+	stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
+	endpoint_topic_all_cached = NULL;
+
+	ao2_cleanup(endpoint_topic_all);
+	endpoint_topic_all = NULL;
+}
+
 int ast_endpoint_stasis_init(void)
 {
 	ast_register_atexit(endpoints_stasis_shutdown);

Modified: trunk/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_message_router.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/main/stasis_message_router.c (original)
+++ trunk/main/stasis_message_router.c Fri May 17 16:10:32 2013
@@ -74,6 +74,7 @@
 	size_t i;
 
 	ast_assert(!stasis_subscription_is_subscribed(router->subscription));
+	ast_assert(stasis_subscription_is_done(router->subscription));
 	router->subscription = NULL;
 	for (i = 0; i < router->num_routes_current; ++i) {
 		ao2_cleanup(router->routes[i]);
@@ -165,6 +166,26 @@
 	stasis_unsubscribe(router->subscription);
 }
 
+void stasis_message_router_unsubscribe_and_join(
+	struct stasis_message_router *router)
+{
+	if (!router) {
+		return;
+	}
+	stasis_unsubscribe_and_join(router->subscription);
+}
+
+int stasis_message_router_is_done(struct stasis_message_router *router)
+{
+	if (!router) {
+		/* Null router is about as done as you can get */
+		return 1;
+	}
+
+	return stasis_subscription_is_done(router->subscription);
+}
+
+
 static struct stasis_message_route *route_create(
 	struct stasis_message_type *message_type,
 	stasis_subscription_cb callback,

Modified: trunk/res/res_chan_stats.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_chan_stats.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/res/res_chan_stats.c (original)
+++ trunk/res/res_chan_stats.c Fri May 17 16:10:32 2013
@@ -172,9 +172,9 @@
 
 static int unload_module(void)
 {
-	stasis_unsubscribe(sub);
+	stasis_unsubscribe_and_join(sub);
 	sub = NULL;
-	stasis_message_router_unsubscribe(router);
+	stasis_message_router_unsubscribe_and_join(router);
 	router = NULL;
 	return 0;
 }

Modified: trunk/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_jabber.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/res/res_jabber.c (original)
+++ trunk/res/res_jabber.c Fri May 17 16:10:32 2013
@@ -4770,12 +4770,8 @@
 	ast_unregister_application(app_ajileave);
 	ast_manager_unregister("JabberSend");
 	ast_custom_function_unregister(&jabberstatus_function);
-	if (mwi_sub) {
-		mwi_sub = stasis_unsubscribe(mwi_sub);
-	}
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 	ast_custom_function_unregister(&jabberreceive_function);
 
 	ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {

Modified: trunk/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_stasis.c?view=diff&rev=389011&r1=389010&r2=389011
==============================================================================
--- trunk/res/res_stasis.c (original)
+++ trunk/res/res_stasis.c Fri May 17 16:10:32 2013
@@ -722,7 +722,7 @@
 {
 	int r = 0;
 
-	stasis_message_router_unsubscribe(channel_router);
+	stasis_message_router_unsubscribe_and_join(channel_router);
 	channel_router = NULL;
 
 	ao2_cleanup(apps_registry);




More information about the asterisk-commits mailing list