[svn-commits] dlee: branch dlee/clean-shutdown r388694 - in /team/dlee/clean-shutdown: apps...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Tue May 14 12:36:00 CDT 2013


Author: dlee
Date: Tue May 14 12:35:57 2013
New Revision: 388694

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=388694
Log:
Adding stasis_unsubscribe_and_join

Modified:
    team/dlee/clean-shutdown/apps/app_queue.c
    team/dlee/clean-shutdown/apps/app_voicemail.c
    team/dlee/clean-shutdown/channels/chan_iax2.c
    team/dlee/clean-shutdown/channels/chan_sip.c
    team/dlee/clean-shutdown/funcs/func_presencestate.c
    team/dlee/clean-shutdown/include/asterisk/stasis.h
    team/dlee/clean-shutdown/main/manager.c
    team/dlee/clean-shutdown/main/pbx.c
    team/dlee/clean-shutdown/main/stasis.c
    team/dlee/clean-shutdown/res/res_chan_stats.c
    team/dlee/clean-shutdown/res/res_jabber.c

Modified: team/dlee/clean-shutdown/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/apps/app_queue.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/apps/app_queue.c (original)
+++ team/dlee/clean-shutdown/apps/app_queue.c Tue May 14 12:35:57 2013
@@ -9866,9 +9866,7 @@
 
 	res |= ast_data_unregister(NULL);
 
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
 	ast_extension_state_del(0, extension_state_cb);
 

Modified: team/dlee/clean-shutdown/apps/app_voicemail.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/apps/app_voicemail.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/apps/app_voicemail.c (original)
+++ team/dlee/clean-shutdown/apps/app_voicemail.c Tue May 14 12:35:57 2013
@@ -12689,9 +12689,7 @@
 {
 	poll_thread_run = 0;
 
-	if (mwi_sub_sub) {
-		mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
-	}
+	mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
 
 	ast_mutex_lock(&poll_lock);
 	ast_cond_signal(&poll_cond);

Modified: team/dlee/clean-shutdown/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/channels/chan_iax2.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/channels/chan_iax2.c (original)
+++ team/dlee/clean-shutdown/channels/chan_iax2.c Tue May 14 12:35:57 2013
@@ -1333,9 +1333,7 @@
 
 static void network_change_stasis_unsubscribe(void)
 {
-	if (network_change_sub) {
-		network_change_sub = stasis_unsubscribe(network_change_sub);
-	}
+	network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
@@ -1348,9 +1346,7 @@
 
 static void acl_change_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)
@@ -12415,9 +12411,7 @@
 	if (peer->dnsmgr)
 		ast_dnsmgr_release(peer->dnsmgr);
 
-	if (peer->mwi_event_sub) {
-		peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
-	}
+	peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
 
 	ast_string_field_free_memory(peer);
 }

Modified: team/dlee/clean-shutdown/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/channels/chan_sip.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/channels/chan_sip.c (original)
+++ team/dlee/clean-shutdown/channels/chan_sip.c Tue May 14 12:35:57 2013
@@ -16742,25 +16742,21 @@
 
 static void network_change_stasis_unsubscribe(void)
 {
-	if (network_change_sub) {
-		network_change_sub = stasis_unsubscribe(network_change_sub);
-	}
+	network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
 }
 
 static void acl_change_stasis_subscribe(void)
 {
 	if (!acl_change_sub) {
 		acl_change_sub = stasis_subscribe(ast_acl_topic(),
-		acl_change_stasis_cb, NULL);
+			acl_change_stasis_cb, NULL);
 	}
 
 }
 
 static void acl_change_event_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 static int network_change_sched_cb(const void *data)

Modified: team/dlee/clean-shutdown/funcs/func_presencestate.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/funcs/func_presencestate.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/funcs/func_presencestate.c (original)
+++ team/dlee/clean-shutdown/funcs/func_presencestate.c Tue May 14 12:35:57 2013
@@ -706,7 +706,7 @@
 		return AST_TEST_FAIL;
 	}
 
-	test_sub = stasis_unsubscribe(test_sub);
+	test_sub = stasis_unsubscribe_and_join(test_sub);
 
 	ao2_cleanup(cb_data->presence_state);
 	ast_free((char *)cb_data);

Modified: team/dlee/clean-shutdown/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/include/asterisk/stasis.h?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/include/asterisk/stasis.h (original)
+++ team/dlee/clean-shutdown/include/asterisk/stasis.h Tue May 14 12:35:57 2013
@@ -292,8 +292,7 @@
  * \since 12
  */
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
-					     stasis_subscription_cb callback,
-					     void *data);
+	stasis_subscription_cb callback, void *data);
 
 /*!
  * \brief Cancel a subscription.
@@ -304,10 +303,26 @@
  * delivery of the final message.
  *
  * \param subscription Subscription to cancel.
- * \retval NULL for convenience
- * \since 12
- */
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+	struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait ofr
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe_and_join(
+	struct stasis_subscription *subscription);
 
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
@@ -322,7 +337,8 @@
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+	struct stasis_topic *to_topic);
 
 /*!
  * \brief Get the unique ID for the subscription.
@@ -389,7 +405,8 @@
 /*!
  * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
  * \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
  */
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
 
@@ -397,8 +414,8 @@
  * \brief Find or create a topic in the pool
  * \param pool Pool for which to get the topic
  * \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
  */
 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
 
@@ -498,7 +515,7 @@
 /*!
  * Unsubscribes a caching topic from its upstream topic.
  * \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
+ * \return \c NULL for convenience
  * \since 12
  */
 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
@@ -530,9 +547,9 @@
 /*!
  * \brief Dump cached items to a subscription
  * \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
  * \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
  * \since 12
  */
 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,

Modified: team/dlee/clean-shutdown/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/main/manager.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/main/manager.c (original)
+++ team/dlee/clean-shutdown/main/manager.c Tue May 14 12:35:57 2013
@@ -1077,9 +1077,7 @@
 
 static void acl_change_stasis_unsubscribe(void)
 {
-	if (acl_change_sub) {
-		acl_change_sub = stasis_unsubscribe(acl_change_sub);
-	}
+	acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
 }
 
 /* In order to understand what the heck is going on with the

Modified: team/dlee/clean-shutdown/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/main/pbx.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/main/pbx.c (original)
+++ team/dlee/clean-shutdown/main/pbx.c Tue May 14 12:35:57 2013
@@ -11700,12 +11700,8 @@
 {
 	int x;
 
-	if (presence_state_sub) {
-		presence_state_sub = stasis_unsubscribe(presence_state_sub);
-	}
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 
 	/* Unregister builtin applications */
 	for (x = 0; x < ARRAY_LEN(builtins); x++) {

Modified: team/dlee/clean-shutdown/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/main/stasis.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/main/stasis.c (original)
+++ team/dlee/clean-shutdown/main/stasis.c Tue May 14 12:35:57 2013
@@ -114,6 +114,14 @@
 	stasis_subscription_cb callback;
 	/*! Data pointer to be handed to the callback. */
 	void *data;
+
+	/*! Lock for joining with subscription. */
+	ast_mutex_t join_lock;
+	/*! Condition for joining with subscription. */
+	ast_cond_t join_cond;
+	/*! Flag set when final message for sub has been received.
+	 *  Be sure join_lock is held before reading/setting. */
+	int final_message_rxed;
 };
 
 static void subscription_dtor(void *obj)
@@ -124,6 +132,8 @@
 	sub->topic = NULL;
 	ast_taskprocessor_unreference(sub->mailbox);
 	sub->mailbox = NULL;
+	ast_mutex_destroy(&sub->join_lock);
+	ast_cond_destroy(&sub->join_cond);
 }
 
 /*!
@@ -136,11 +146,17 @@
 				  struct stasis_topic *topic,
 				  struct stasis_message *message)
 {
-	/* Since sub->topic doesn't change, no need to lock sub */
+	/* Since sub is mostly immutable, no need to lock sub */
 	sub->callback(sub->data,
 		      sub,
 		      topic,
 		      message);
+
+	/* The final message flag, though, needs a lock */
+	ast_mutex_lock(&sub->join_lock);
+	sub->final_message_rxed = 1;
+	ast_cond_signal(&sub->join_cond);
+	ast_mutex_unlock(&sub->join_lock);
 }
 
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@@ -171,6 +187,8 @@
 	sub->topic = topic;
 	sub->callback = callback;
 	sub->data = data;
+	ast_mutex_init(&sub->join_lock);
+	ast_cond_init(&sub->join_cond, NULL);
 
 	if (topic_add_subscription(topic, sub) != 0) {
 		return NULL;
@@ -209,6 +227,30 @@
 
 		ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
 	}
+	return NULL;
+}
+
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+static void subscription_join(struct stasis_subscription *subscription)
+{
+	if (subscription) {
+		SCOPED_MUTEX(lock, &subscription->join_lock);
+		while (!subscription->final_message_rxed) {
+			ast_cond_wait(&subscription->join_cond,
+				&subscription->join_lock);
+		}
+	}
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+	struct stasis_subscription *subscription)
+{
+	stasis_unsubscribe(subscription);
+	subscription_join(subscription);
 	return NULL;
 }
 

Modified: team/dlee/clean-shutdown/res/res_chan_stats.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/res/res_chan_stats.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/res/res_chan_stats.c (original)
+++ team/dlee/clean-shutdown/res/res_chan_stats.c Tue May 14 12:35:57 2013
@@ -172,7 +172,7 @@
 
 static int unload_module(void)
 {
-	stasis_unsubscribe(sub);
+	stasis_unsubscribe_and_join(sub);
 	sub = NULL;
 	stasis_message_router_unsubscribe(router);
 	router = NULL;

Modified: team/dlee/clean-shutdown/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/clean-shutdown/res/res_jabber.c?view=diff&rev=388694&r1=388693&r2=388694
==============================================================================
--- team/dlee/clean-shutdown/res/res_jabber.c (original)
+++ team/dlee/clean-shutdown/res/res_jabber.c Tue May 14 12:35:57 2013
@@ -4770,12 +4770,8 @@
 	ast_unregister_application(app_ajileave);
 	ast_manager_unregister("JabberSend");
 	ast_custom_function_unregister(&jabberstatus_function);
-	if (mwi_sub) {
-		mwi_sub = stasis_unsubscribe(mwi_sub);
-	}
-	if (device_state_sub) {
-		device_state_sub = stasis_unsubscribe(device_state_sub);
-	}
+	mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
+	device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
 	ast_custom_function_unregister(&jabberreceive_function);
 
 	ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {




More information about the svn-commits mailing list