[asterisk-commits] kmoore: branch kmoore/stasis-mwi r382465 - in /team/kmoore/stasis-mwi: includ...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Mar 5 12:43:47 CST 2013


Author: kmoore
Date: Tue Mar  5 12:43:44 2013
New Revision: 382465

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382465
Log:
Fix the memory corruption/void* lifetime management problems

Change the definition of the callback to include the
stasis_subscription object so that the uniqueid can be checked against
incoming messages to appropriately manage the lifetime of objects
handed to the subscription that are in need of destruction.

Correct stasis_cache to manage object lifetime properly.

Correct stasis tests to manage object lifetime properly.

Introduce a new function (stasis_subscription_final_message) to assist
in determining which message is the callback's final message.

Modified:
    team/kmoore/stasis-mwi/include/asterisk/stasis.h
    team/kmoore/stasis-mwi/main/stasis.c
    team/kmoore/stasis-mwi/main/stasis_cache.c
    team/kmoore/stasis-mwi/tests/test_stasis.c

Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382465&r1=382464&r2=382465
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Tue Mar  5 12:43:44 2013
@@ -280,19 +280,19 @@
 /*! @{ */
 
 /*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
  * \brief Callback function type for Stasis subscriptions.
  * \param data Data field provided with subscription.
  * \param topic Topic to which the message was published.
  * \param message Published message.
  * \since 12
  */
-typedef void (*stasis_subscription_cb)(void *data, struct stasis_topic *topic, struct stasis_message *message);
-
-/*!
- * \brief Opaque type for a Stasis subscription.
- * \since 12
- */
-struct stasis_subscription;
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
 
 /*!
  * \brief Create a subscription.
@@ -352,6 +352,17 @@
  * \since 12
  */
 const char *stasis_subscription_uniqueid(struct stasis_subscription *sub);
+
+/*!
+ * \brief Determine whether a message is the final message to be received on a subscription.
+ *
+ * \param sub Subscription on which the message was received.
+ * \param msg Message to check.
+ * \return zero if the provided message is not the final message.
+ * \return non-zero if the provided message is the final message.
+ * \since 12
+ */
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
 
 /*!
  * \brief Holds details about changes to subscriptions for the specified topic

Modified: team/kmoore/stasis-mwi/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis.c?view=diff&rev=382465&r1=382464&r2=382465
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis.c (original)
+++ team/kmoore/stasis-mwi/main/stasis.c Tue Mar  5 12:43:44 2013
@@ -175,7 +175,6 @@
 		size_t i;
 		struct stasis_topic *topic = sub->topic;
 		SCOPED_AO2LOCK(lock_topic, topic);
-		cleanup_after_unlock = sub;
 
 		for (i = 0; i < topic->num_subscribers_current; ++i) {
 			if (topic->subscribers[i] == sub) {
@@ -195,6 +194,25 @@
 const char *stasis_subscription_uniqueid(struct stasis_subscription *sub)
 {
 	return sub->uniqueid;
+}
+
+int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
+{
+	struct stasis_subscription_change *change;
+	if (stasis_message_type(msg) != stasis_subscription_change()) {
+		return 0;
+	}
+
+	change = stasis_message_data(msg);
+	if (strcmp("Unsubscribe", change->description)) {
+		return 0;
+	}
+
+	if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+		return 0;
+	}
+
+	return 1;
 }
 
 /*!
@@ -287,6 +305,7 @@
 	sub_topic = dispatch->sub->topic;
 
 	dispatch->sub->callback(dispatch->sub->data,
+				dispatch->sub,
 				sub_topic,
 				dispatch->message);
 
@@ -339,7 +358,7 @@
 			}
 		} else {
 			/* No mailbox; dispatch directly */
-			sub->callback(sub->data, sub->topic, message);
+			sub->callback(sub->data, sub, sub->topic, message);
 		}
 	}
 
@@ -362,7 +381,7 @@
 }
 
 /*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_topic *topic, struct stasis_message *message)
+static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
 {
 	struct stasis_topic *to_topic = data;
 	stasis_forward_message(to_topic, topic, message);

Modified: team/kmoore/stasis-mwi/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_cache.c?view=diff&rev=382465&r1=382464&r2=382465
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_cache.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_cache.c Tue Mar  5 12:43:44 2013
@@ -45,7 +45,6 @@
 struct stasis_caching_topic {
 	struct ao2_container *cache;
 	struct stasis_topic *topic;
-	char *uniqueid;
 	snapshot_get_id id_fn;
 };
 
@@ -55,8 +54,6 @@
 	caching_topic->cache = NULL;
 	ao2_cleanup(caching_topic->topic);
 	caching_topic->topic = NULL;
-	ast_free(caching_topic->uniqueid);
-	caching_topic->uniqueid = NULL;
 }
 
 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
@@ -340,7 +337,7 @@
 	return msg;
 }
 
-static void caching_topic_exec(void *data, struct stasis_topic *topic, struct stasis_message *message)
+static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
 	struct stasis_caching_topic *caching_topic = data;
@@ -349,12 +346,8 @@
 	ast_assert(caching_topic->topic != NULL);
 	ast_assert(caching_topic->id_fn != NULL);
 
-	if (stasis_subscription_change() == stasis_message_type(message)) {
-		struct stasis_subscription_change *change = stasis_message_data(message);
-		/* If this is the last message this caching topic will ever receive */
-		if (!strcmp("Unsubscribe", change->description) && !strcmp(change->uniqueid, caching_topic->uniqueid)) {
-			caching_topic_needs_unref = caching_topic;
-		}
+	if (stasis_subscription_final_message(sub, message)) {
+		caching_topic_needs_unref = caching_topic;
 	}
 
 	/* Handle cache clear event */
@@ -430,16 +423,10 @@
 	if (*sub == NULL) {
 		return NULL;
 	}
-
-	caching_topic->uniqueid = ast_strdup(stasis_subscription_uniqueid(*sub));
-	if (caching_topic->uniqueid == NULL) {
-		return NULL;
-	}
-
-	/* Bump this one extra because of the reference living in the
-	 * subscription that will be cleared by the callback upon the
-	 * subscription being cancelled */
-	ao2_ref(caching_topic, +2);
+	/* This is for the reference contained in the subscription above */
+	ao2_ref(caching_topic, +1);
+
+	ao2_ref(caching_topic, +1);
 	return caching_topic;
 }
 

Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382465&r1=382464&r2=382465
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Tue Mar  5 12:43:44 2013
@@ -119,6 +119,15 @@
 	size_t unsubs_rxed;
 };
 
+static struct stasis_subscription * local_stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, struct consumer *consumer)
+{
+	struct stasis_subscription *sub;
+	/* Bump the ref count of the consumer since it's being held by the subscription */
+	ao2_ref(consumer, +1);
+	sub = stasis_subscribe(topic, callback, consumer);
+	return sub;
+}
+
 enum consumer_rx_type {
 	MESSAGE,
 	SUBSCRIPTION,
@@ -155,16 +164,21 @@
 	return consumer;
 }
 
-static void consumer_exec(void *data, struct stasis_topic *topic, struct stasis_message *message) {
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) {
 	struct consumer *consumer = data;
+	RAII_VAR(struct consumer *, consumer_needs_unref, NULL, ao2_cleanup);
 
 	SCOPED_MUTEX(lock, &consumer->lock);
 	if (stasis_message_type(message) == stasis_subscription_change()) {
 		struct stasis_subscription_change *change = stasis_message_data(message);
+
 		if (!strcmp("Subscribe", change->description)) {
 			++consumer->subs_rxed;
 		} else if (!strcmp("Unsubscribe", change->description)) {
 			++consumer->unsubs_rxed;
+			if (!strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
+				consumer_needs_unref = consumer;
+			}
 		}
 
 		ast_cond_signal(&consumer->out);
@@ -279,7 +293,7 @@
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
 
-	uut = stasis_subscribe(topic, consumer_exec, consumer);
+	uut = local_stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != uut);
 
 	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
@@ -327,7 +341,7 @@
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
 
-	uut = stasis_subscribe(topic, consumer_exec, consumer);
+	uut = local_stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != uut);
 
 	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
@@ -397,13 +411,13 @@
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
 
-	parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
+	parent_sub = local_stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
 	ast_test_validate(test, NULL != parent_sub);
 
 	actual_len = consumer_wait_for(parent_consumer, 1, SUBSCRIPTION);
 	ast_test_validate(test, 1 == parent_consumer->subs_rxed);
 
-	sub = stasis_subscribe(topic, consumer_exec, consumer);
+	sub = local_stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
 
 	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
@@ -501,7 +515,7 @@
 	ast_test_validate(test, NULL != topic);
 	consumer_non_caching = consumer_create();
 	ast_test_validate(test, NULL != consumer_non_caching);
-	sub_non_caching = stasis_subscribe(topic, consumer_exec, consumer_non_caching);
+	sub_non_caching = local_stasis_subscribe(topic, consumer_exec, consumer_non_caching);
 	ast_test_validate(test, NULL != sub_non_caching);
 
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
@@ -519,7 +533,7 @@
 	actual_len = consumer_should_stay(consumer, 0, SUBSCRIPTION);
 	ast_test_validate(test, 0 == consumer->subs_rxed);
 
-	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+	sub = local_stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
 
 	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
@@ -578,7 +592,7 @@
 	ast_test_validate(test, NULL != topic);
 	consumer_non_caching = consumer_create();
 	ast_test_validate(test, NULL != consumer_non_caching);
-	sub_non_caching = stasis_subscribe(topic, consumer_exec, consumer_non_caching);
+	sub_non_caching = local_stasis_subscribe(topic, consumer_exec, consumer_non_caching);
 	ast_test_validate(test, NULL != sub_non_caching);
 
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
@@ -595,7 +609,7 @@
 	 * caching topic */
 	actual_len = consumer_should_stay(consumer, 0, SUBSCRIPTION);
 
-	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
+	sub = local_stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
 
 	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);




More information about the asterisk-commits mailing list