[asterisk-commits] kmoore: branch kmoore/stasis-mwi r382439 - in /team/kmoore/stasis-mwi: includ...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Mar 4 23:40:56 CST 2013


Author: kmoore
Date: Mon Mar  4 23:40:52 2013
New Revision: 382439

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382439
Log:
Initial attempt at mitigating the race condition

Modified:
    team/kmoore/stasis-mwi/include/asterisk/stasis.h
    team/kmoore/stasis-mwi/main/app.c
    team/kmoore/stasis-mwi/main/channel.c
    team/kmoore/stasis-mwi/main/stasis.c
    team/kmoore/stasis-mwi/main/stasis_cache.c
    team/kmoore/stasis-mwi/tests/test_stasis.c

Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Mon Mar  4 23:40:52 2013
@@ -343,6 +343,15 @@
  * \since 12
  */
 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*!
+ * \brief Get the unique ID for the subscription.
+ *
+ * \param sub Subscription for which to get the unique ID.
+ * \return Unique ID for the subscription.
+ * \since 12
+ */
+const char *stasis_subscription_uniqueid(struct stasis_subscription *sub);
 
 /*!
  * \brief Holds details about changes to subscriptions for the specified topic
@@ -428,11 +437,12 @@
  *
  * \param original_topic Topic publishing snapshot messages.
  * \param id_fn Callback to extract the id from a snapshot message.
+ * \param[out] Subscription produced by the caching backend held for the life of the cache concurrently with the returned stasis_caching_topic.
  * \return New topic which changes snapshot messages to stasis_cache_update()
  *         messages, and forwards all other messages from the original topic.
  * \since 12
  */
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub);
 
 /*!
  * \brief Returns the topic of cached events from a caching topics.

Modified: team/kmoore/stasis-mwi/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/app.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/app.c (original)
+++ team/kmoore/stasis-mwi/main/app.c Mon Mar  4 23:40:52 2013
@@ -82,6 +82,7 @@
 
 static struct stasis_topic *__mwi_topic_all;
 static struct stasis_caching_topic *__mwi_topic_cached;
+static struct stasis_subscription *__mwi_cache_sub;
 static struct stasis_message_type *__mwi_message_type;
 static struct ao2_container *__mwi_topics;
 
@@ -2784,6 +2785,8 @@
 	__mwi_message_type = NULL;
 	ao2_cleanup(__mwi_topics);
 	__mwi_topics = NULL;
+	stasis_unsubscribe(__mwi_cache_sub);
+	__mwi_cache_sub = NULL;
 }
 
 #define MWI_TOPIC_BUCKETS 57
@@ -2791,7 +2794,7 @@
 int app_init(void)
 {
 	__mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
-	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id);
+	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id, &__mwi_cache_sub);
 	__mwi_message_type = stasis_message_type_create("stasis_mwi_state");
 	__mwi_topics = ao2_container_alloc(MWI_TOPIC_BUCKETS, mwi_topic_hash, mwi_topic_cmp);
 	ast_register_atexit(app_exit);

Modified: team/kmoore/stasis-mwi/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/channel.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/channel.c (original)
+++ team/kmoore/stasis-mwi/main/channel.c Mon Mar  4 23:40:52 2013
@@ -153,13 +153,15 @@
 static struct ao2_container *channels;
 
 /*! \brief Message type for channel snapshot events */
-static struct stasis_message_type *__ast_channel_snapshot;
-
-static struct stasis_message_type *__ast_channel_varset_event;
-
-struct stasis_topic *__ast_channel_events_all;
-
-struct stasis_caching_topic *__ast_channel_events_all_cached;
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset_event;
+
+struct stasis_topic *__channel_events_all;
+
+struct stasis_caching_topic *__channel_events_all_cached;
+
+struct stasis_subscription *__channel_cache_sub;
 
 /*! \brief map AST_CAUSE's to readable string representations
  *
@@ -8629,12 +8631,12 @@
 
 static void channels_shutdown(void)
 {
-	ao2_cleanup(__ast_channel_snapshot);
-	__ast_channel_snapshot = NULL;
-	ao2_cleanup(__ast_channel_events_all);
-	__ast_channel_events_all = NULL;
-	ao2_cleanup(__ast_channel_events_all_cached);
-	__ast_channel_events_all_cached = NULL;
+	ao2_cleanup(__channel_snapshot);
+	__channel_snapshot = NULL;
+	ao2_cleanup(__channel_events_all);
+	__channel_events_all = NULL;
+	ao2_cleanup(__channel_events_all_cached);
+	__channel_events_all_cached = NULL;
 	ast_data_unregister(NULL);
 	ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
 	if (channels) {
@@ -8662,11 +8664,11 @@
 		ao2_container_register("channels", channels, prnt_channel_key);
 	}
 
-	__ast_channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
-	__ast_channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
-
-	__ast_channel_events_all = stasis_topic_create("ast_channel_events_all");
-	__ast_channel_events_all_cached = stasis_caching_topic_create(__ast_channel_events_all, channel_snapshot_get_id);
+	__channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+	__channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
+
+	__channel_events_all = stasis_topic_create("ast_channel_events_all");
+	__channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id, &__channel_cache_sub);
 
 	ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
 
@@ -11304,22 +11306,22 @@
 
 struct stasis_message_type *ast_channel_varset_event(void)
 {
-	return __ast_channel_varset_event;
+	return __channel_varset_event;
 }
 
 struct stasis_message_type *ast_channel_snapshot(void)
 {
-	return __ast_channel_snapshot;
+	return __channel_snapshot;
 }
 
 struct stasis_topic *ast_channel_events_all(void)
 {
-	return __ast_channel_events_all;
+	return __channel_events_all;
 }
 
 struct stasis_caching_topic *ast_channel_events_all_cached(void)
 {
-	return __ast_channel_events_all_cached;
+	return __channel_events_all_cached;
 }
 
 /* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY

Modified: team/kmoore/stasis-mwi/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis.c (original)
+++ team/kmoore/stasis-mwi/main/stasis.c Mon Mar  4 23:40:52 2013
@@ -66,9 +66,6 @@
 	struct stasis_topic *topic = obj;
 	ast_free(topic->name);
 	topic->name = NULL;
-	while (topic->num_subscribers_current > 0) {
-		stasis_unsubscribe(topic->subscribers[0]);
-	}
 	ast_free(topic->subscribers);
 	topic->subscribers = NULL;
 }
@@ -106,8 +103,8 @@
 /*! \private */
 struct stasis_subscription {
 	/*! Unique ID for this subscription */
-	char *id;
-	/*! Native pointer to the topic. AO2 ref could cause a cycle. */
+	char *uniqueid;
+	/*! Topic subscribed to. */
 	struct stasis_topic *topic;
 	/*! Mailbox for processing incoming messages. */
 	struct ast_taskprocessor *mailbox;
@@ -120,8 +117,6 @@
 static void subscription_dtor(void *obj)
 {
 	struct stasis_subscription *sub = obj;
-	/* This should never be called until after we've been unsubscribed,
-	 * which means that topic will have been reffed for the orphaned sub */
 	ao2_cleanup(sub->topic);
 	ao2_cleanup(sub->mailbox);
 	sub->mailbox = NULL;
@@ -153,8 +148,9 @@
 		}
 	}
 
-	sub->id = ast_strdup(stupid_name);
-	sub->topic = topic; /* Don't increase the refcount, or it will cause a cyclic ref! */
+	sub->uniqueid = ast_strdup(stupid_name);
+	ao2_ref(topic, +1);
+	sub->topic = topic;
 	sub->callback = callback;
 	sub->data = data;
 
@@ -177,6 +173,7 @@
 	if (sub) {
 		RAII_VAR(struct stasis_subscription *, cleanup_after_unlock, NULL, ao2_cleanup);
 		SCOPED_AO2LOCK(lock, sub);
+		cleanup_after_unlock = sub;
 
 		if (sub->topic) {
 			size_t i;
@@ -185,14 +182,9 @@
 
 			for (i = 0; i < topic->num_subscribers_current; ++i) {
 				if (topic->subscribers[i] == sub) {
-					send_subscription_change_message(topic, sub->id, "Unsubscribe", 1);
+					send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe", 1);
 					/* swap [i] with last entry; remove last entry */
 					topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
-					/* We can't clean up now, since the lock is held. defer to RAII */
-					cleanup_after_unlock = sub;
-					/* Now that the topic will no longer have a ref on the sub,
-					 * the sub can have a ref on the topic without creating a cyclic ref */
-					ao2_ref(sub->topic, +1);
 					return;
 				}
 			}
@@ -200,6 +192,11 @@
 			ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
 		}
 	}
+}
+
+const char *stasis_subscription_uniqueid(struct stasis_subscription *sub)
+{
+	return sub->uniqueid;
 }
 
 /*!
@@ -224,7 +221,7 @@
 		topic->num_subscribers_max *= 2;
 	}
 
-	ao2_ref(sub, +1);
+	/* Don't ref sub here or we'll cause a reference cycle. */
 	topic->subscribers[topic->num_subscribers_current++] = sub;
 	return 0;
 }
@@ -433,14 +430,14 @@
 		return;
 	}
 
-	ao2_ref(change, +1);
-	ao2_ref(msg, +1);
 	__forward_message_full(topic, topic, msg, topic_locked);
 }
 
 /*! \brief Cleanup function */
 static void stasis_exit(void)
 {
+	ao2_cleanup(__subscription_change_message_type);
+	__subscription_change_message_type = NULL;
 	ast_threadpool_shutdown(pool);
 	pool = NULL;
 }

Modified: team/kmoore/stasis-mwi/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_cache.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_cache.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_cache.c Mon Mar  4 23:40:52 2013
@@ -45,6 +45,7 @@
 struct stasis_caching_topic {
 	struct ao2_container *cache;
 	struct stasis_topic *topic;
+	char *uniqueid;
 	snapshot_get_id id_fn;
 };
 
@@ -54,6 +55,8 @@
 	caching_topic->cache = NULL;
 	ao2_cleanup(caching_topic->topic);
 	caching_topic->topic = NULL;
+	ast_free(caching_topic->uniqueid);
+	caching_topic->uniqueid = NULL;
 }
 
 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
@@ -339,11 +342,20 @@
 
 static void caching_topic_exec(void *data, struct stasis_topic *topic, struct stasis_message *message)
 {
+	RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
 	struct stasis_caching_topic *caching_topic = data;
 	const char *id = NULL;
 
 	ast_assert(caching_topic->topic != NULL);
 	ast_assert(caching_topic->id_fn != NULL);
+
+	if (stasis_subscription_change() == stasis_message_type(message)) {
+		struct stasis_subscription_change *change = stasis_message_data(message);
+		/* If this is the last message this caching topic will ever receive */
+		if (!strcmp("Unsubscribe", change->description) && !strcmp(change->uniqueid, caching_topic->uniqueid)) {
+			caching_topic_needs_unref = caching_topic;
+		}
+	}
 
 	/* Handle cache clear event */
 	if (cache_clear_data() == stasis_message_type(message)) {
@@ -384,13 +396,13 @@
 	}
 }
 
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
-	struct stasis_subscription *sub;
 	RAII_VAR(char *, new_name, NULL, free);
 	int ret;
 
+	ast_assert(sub != NULL);
 	ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
 	if (ret < 0) {
 		return NULL;
@@ -414,12 +426,20 @@
 
 	caching_topic->id_fn = id_fn;
 
-	sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
-	if (sub == NULL) {
-		return NULL;
-	}
-
-	ao2_ref(caching_topic, +1);
+	*sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+	if (*sub == NULL) {
+		return NULL;
+	}
+
+	caching_topic->uniqueid = ast_strdup(stasis_subscription_uniqueid(*sub));
+	if (caching_topic->uniqueid == NULL) {
+		return NULL;
+	}
+
+	/* Bump this one extra because of the reference living in the
+	 * subscription that will be cleared by the callback upon the
+	 * subscription being cancelled */
+	ao2_ref(caching_topic, +2);
 	return caching_topic;
 }
 

Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Mon Mar  4 23:40:52 2013
@@ -254,7 +254,7 @@
 AST_TEST_DEFINE(subscribe)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, uut, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
 	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
@@ -304,10 +304,10 @@
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, uut, NULL, ao2_cleanup);
 	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+	struct stasis_subscription *uut = NULL;
 	int actual_len;
 
 	switch (cmd) {
@@ -360,9 +360,9 @@
 	RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 
-	RAII_VAR(struct stasis_subscription *, forward_sub, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, parent_sub, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 
 	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
@@ -473,10 +473,11 @@
 	RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, stasis_unsubscribe);
 	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
 	int actual_len;
 	struct stasis_message_type *actual_type;
@@ -505,7 +506,7 @@
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
 	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
 
-	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
@@ -544,10 +545,11 @@
 	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, stasis_unsubscribe);
 	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
@@ -581,7 +583,7 @@
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
 	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
 
-	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);




More information about the asterisk-commits mailing list