[asterisk-commits] kmoore: branch kmoore/stasis-mwi r382480 - in /team/kmoore/stasis-mwi: ./ inc...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Mar 5 21:31:21 CST 2013
Author: kmoore
Date: Tue Mar 5 21:31:17 2013
New Revision: 382480
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382480
Log:
Address leakage of caching_topic subscriptions.
There's still a leak in the cache, but hopefully I'll clear that up after
bringing in kmoore's subscribe/unsubscribe notification patch.
........
Merged revisions 382477 from http://svn.asterisk.org/svn/asterisk/team/dlee/stasis-core
Modified:
team/kmoore/stasis-mwi/ (props changed)
team/kmoore/stasis-mwi/include/asterisk/stasis.h
team/kmoore/stasis-mwi/main/app.c
team/kmoore/stasis-mwi/main/channel.c
team/kmoore/stasis-mwi/main/stasis_cache.c
team/kmoore/stasis-mwi/tests/test_stasis.c
Propchange: team/kmoore/stasis-mwi/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Mar 5 21:31:17 2013
@@ -1,1 +1,1 @@
-/team/dlee/stasis-core:1-382460
+/team/dlee/stasis-core:1-382479
Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Tue Mar 5 21:31:17 2013
@@ -453,7 +453,14 @@
* messages, and forwards all other messages from the original topic.
* \since 12
*/
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub);
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+
+/*!
+ * Unsubscribes a caching topic from its upstream topic.
+ * \param caching_topic Caching topic to unsubscribe
+ * \since 12
+ */
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
Modified: team/kmoore/stasis-mwi/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/app.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/app.c (original)
+++ team/kmoore/stasis-mwi/main/app.c Tue Mar 5 21:31:17 2013
@@ -82,7 +82,6 @@
static struct stasis_topic *__mwi_topic_all;
static struct stasis_caching_topic *__mwi_topic_cached;
-static struct stasis_subscription *__mwi_cache_sub;
static struct stasis_message_type *__mwi_message_type;
static struct ao2_container *__mwi_topics;
@@ -2790,8 +2789,6 @@
__mwi_message_type = NULL;
ao2_cleanup(__mwi_topics);
__mwi_topics = NULL;
- stasis_unsubscribe(__mwi_cache_sub);
- __mwi_cache_sub = NULL;
}
#define MWI_TOPIC_BUCKETS 57
@@ -2799,7 +2796,7 @@
int app_init(void)
{
__mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
- __mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id, &__mwi_cache_sub);
+ __mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id);
__mwi_message_type = stasis_message_type_create("stasis_mwi_state");
__mwi_topics = ao2_container_alloc(MWI_TOPIC_BUCKETS, mwi_topic_hash, mwi_topic_cmp);
ast_register_atexit(app_exit);
Modified: team/kmoore/stasis-mwi/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/channel.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/channel.c (original)
+++ team/kmoore/stasis-mwi/main/channel.c Tue Mar 5 21:31:17 2013
@@ -160,8 +160,6 @@
struct stasis_topic *__channel_events_all;
struct stasis_caching_topic *__channel_events_all_cached;
-
-struct stasis_subscription *__channel_cache_sub;
/*! \brief map AST_CAUSE's to readable string representations
*
@@ -8668,7 +8666,7 @@
__channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
__channel_events_all = stasis_topic_create("ast_channel_events_all");
- __channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id, &__channel_cache_sub);
+ __channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id);
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
Modified: team/kmoore/stasis-mwi/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_cache.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_cache.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_cache.c Tue Mar 5 21:31:17 2013
@@ -45,6 +45,7 @@
struct stasis_caching_topic {
struct ao2_container *cache;
struct stasis_topic *topic;
+ struct stasis_subscription *sub;
snapshot_get_id id_fn;
};
@@ -59,6 +60,19 @@
struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
{
return caching_topic->topic;
+}
+
+void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
+{
+ if (caching_topic) {
+ if (caching_topic->sub) {
+ stasis_unsubscribe(caching_topic->sub);
+ caching_topic->sub = NULL;
+ ao2_cleanup(caching_topic);
+ } else {
+ ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
+ }
+ }
}
struct cache_entry {
@@ -146,7 +160,6 @@
/* Remove entry from cache */
cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
old_snapshot = cached_entry->snapshot;
- ao2_ref(old_snapshot, +1);
} else {
/* Insert/update cache */
SCOPED_AO2LOCK(lock, caching_topic->cache);
@@ -389,13 +402,13 @@
}
}
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub)
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
RAII_VAR(char *, new_name, NULL, free);
+ struct stasis_subscription *sub;
int ret;
- ast_assert(sub != NULL);
ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
if (ret < 0) {
return NULL;
@@ -419,12 +432,13 @@
caching_topic->id_fn = id_fn;
- *sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
- if (*sub == NULL) {
+ sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+ if (sub == NULL) {
return NULL;
}
/* This is for the reference contained in the subscription above */
ao2_ref(caching_topic, +1);
+ caching_topic->sub = sub;
ao2_ref(caching_topic, +1);
return caching_topic;
Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382480&r1=382479&r2=382480
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Tue Mar 5 21:31:17 2013
@@ -487,8 +487,7 @@
{
RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@@ -521,7 +520,7 @@
actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
- caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create();
ast_test_validate(test, NULL != consumer);
@@ -559,8 +558,7 @@
{
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@@ -598,7 +596,7 @@
actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
- caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create();
ast_test_validate(test, NULL != consumer);
More information about the asterisk-commits
mailing list