[asterisk-commits] kmoore: branch kmoore/stasis-mwi r382480 - in /team/kmoore/stasis-mwi: ./ inc...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Mar 5 21:31:21 CST 2013


Author: kmoore
Date: Tue Mar  5 21:31:17 2013
New Revision: 382480

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382480
Log:
Address leakage of caching_topic subscriptions.

There's still a leak in the cache, but hopefully I'll clear that up after
bringing in kmoore's subscribe/unsubscribe notification patch.
........

Merged revisions 382477 from http://svn.asterisk.org/svn/asterisk/team/dlee/stasis-core

Modified:
    team/kmoore/stasis-mwi/   (props changed)
    team/kmoore/stasis-mwi/include/asterisk/stasis.h
    team/kmoore/stasis-mwi/main/app.c
    team/kmoore/stasis-mwi/main/channel.c
    team/kmoore/stasis-mwi/main/stasis_cache.c
    team/kmoore/stasis-mwi/tests/test_stasis.c

Propchange: team/kmoore/stasis-mwi/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Mar  5 21:31:17 2013
@@ -1,1 +1,1 @@
-/team/dlee/stasis-core:1-382460
+/team/dlee/stasis-core:1-382479

Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Tue Mar  5 21:31:17 2013
@@ -453,7 +453,14 @@
  *         messages, and forwards all other messages from the original topic.
  * \since 12
  */
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub);
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * Unsubscribes a caching topic from its upstream topic.
+ * \param caching_topic Caching topic to unsubscribe
+ * \since 12
+ */
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
 
 /*!
  * \brief Returns the topic of cached events from a caching topics.

Modified: team/kmoore/stasis-mwi/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/app.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/app.c (original)
+++ team/kmoore/stasis-mwi/main/app.c Tue Mar  5 21:31:17 2013
@@ -82,7 +82,6 @@
 
 static struct stasis_topic *__mwi_topic_all;
 static struct stasis_caching_topic *__mwi_topic_cached;
-static struct stasis_subscription *__mwi_cache_sub;
 static struct stasis_message_type *__mwi_message_type;
 static struct ao2_container *__mwi_topics;
 
@@ -2790,8 +2789,6 @@
 	__mwi_message_type = NULL;
 	ao2_cleanup(__mwi_topics);
 	__mwi_topics = NULL;
-	stasis_unsubscribe(__mwi_cache_sub);
-	__mwi_cache_sub = NULL;
 }
 
 #define MWI_TOPIC_BUCKETS 57
@@ -2799,7 +2796,7 @@
 int app_init(void)
 {
 	__mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
-	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id, &__mwi_cache_sub);
+	__mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id);
 	__mwi_message_type = stasis_message_type_create("stasis_mwi_state");
 	__mwi_topics = ao2_container_alloc(MWI_TOPIC_BUCKETS, mwi_topic_hash, mwi_topic_cmp);
 	ast_register_atexit(app_exit);

Modified: team/kmoore/stasis-mwi/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/channel.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/channel.c (original)
+++ team/kmoore/stasis-mwi/main/channel.c Tue Mar  5 21:31:17 2013
@@ -160,8 +160,6 @@
 struct stasis_topic *__channel_events_all;
 
 struct stasis_caching_topic *__channel_events_all_cached;
-
-struct stasis_subscription *__channel_cache_sub;
 
 /*! \brief map AST_CAUSE's to readable string representations
  *
@@ -8668,7 +8666,7 @@
 	__channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
 
 	__channel_events_all = stasis_topic_create("ast_channel_events_all");
-	__channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id, &__channel_cache_sub);
+	__channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id);
 
 	ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
 

Modified: team/kmoore/stasis-mwi/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_cache.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_cache.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_cache.c Tue Mar  5 21:31:17 2013
@@ -45,6 +45,7 @@
 struct stasis_caching_topic {
 	struct ao2_container *cache;
 	struct stasis_topic *topic;
+	struct stasis_subscription *sub;
 	snapshot_get_id id_fn;
 };
 
@@ -59,6 +60,19 @@
 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
 {
 	return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+	if (caching_topic) {
+		if (caching_topic->sub) {
+			stasis_unsubscribe(caching_topic->sub);
+			caching_topic->sub = NULL;
+			ao2_cleanup(caching_topic);
+		} else {
+			ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+		}
+	}
 }
 
 struct cache_entry {
@@ -146,7 +160,6 @@
 		/* Remove entry from cache */
 		cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
 		old_snapshot = cached_entry->snapshot;
-		ao2_ref(old_snapshot, +1);
 	} else {
 		/* Insert/update cache */
 		SCOPED_AO2LOCK(lock, caching_topic->cache);
@@ -389,13 +402,13 @@
 	}
 }
 
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub)
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
 	RAII_VAR(char *, new_name, NULL, free);
+	struct stasis_subscription *sub;
 	int ret;
 
-	ast_assert(sub != NULL);
 	ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
 	if (ret < 0) {
 		return NULL;
@@ -419,12 +432,13 @@
 
 	caching_topic->id_fn = id_fn;
 
-	*sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
-	if (*sub == NULL) {
+	sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+	if (sub == NULL) {
 		return NULL;
 	}
 	/* This is for the reference contained in the subscription above */
 	ao2_ref(caching_topic, +1);
+	caching_topic->sub = sub;
 
 	ao2_ref(caching_topic, +1);
 	return caching_topic;

Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Tue Mar  5 21:31:17 2013
@@ -487,8 +487,7 @@
 {
 	RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@@ -521,7 +520,7 @@
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
 	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
 
-	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
+	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
@@ -559,8 +558,7 @@
 {
 	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@@ -598,7 +596,7 @@
 	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
 	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
 
-	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
+	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);




More information about the asterisk-commits mailing list