[asterisk-commits] dlee: branch dlee/cache-pattern-fix r396134 - in /team/dlee/cache-pattern-fix...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Aug 2 11:48:07 CDT 2013


Author: dlee
Date: Fri Aug  2 11:48:05 2013
New Revision: 396134

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396134
Log:
Forward _all messages to _all_cached

Modified:
    team/dlee/cache-pattern-fix/main/stasis_cache.c
    team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
    team/dlee/cache-pattern-fix/tests/test_stasis.c

Modified: team/dlee/cache-pattern-fix/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache.c Fri Aug  2 11:48:05 2013
@@ -426,8 +426,7 @@
 
 	id = caching_topic->cache->id_fn(message);
 	if (id == NULL) {
-		/* Object isn't cached; forward */
-		stasis_forward_message(caching_topic->topic, topic, message);
+		/* Object isn't cached; discard */
 	} else {
 		/* Update the cache */
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);

Modified: team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c Fri Aug  2 11:48:05 2013
@@ -38,14 +38,16 @@
 	struct stasis_topic *topic;
 	struct stasis_topic *topic_cached;
 	struct stasis_cache *cache;
+
+	struct stasis_subscription *forward_all_to_cached;
 };
 
 struct stasis_cp_single {
 	struct stasis_topic *topic;
 	struct stasis_caching_topic *topic_cached;
 
-	struct stasis_subscription *forward;
-	struct stasis_subscription *forward_cached;
+	struct stasis_subscription *forward_topic_to_all;
+	struct stasis_subscription *forward_cached_to_all;
 };
 
 static void all_dtor(void *obj)
@@ -53,8 +55,13 @@
 	struct stasis_cp_all *all = obj;
 
 	ao2_cleanup(all->topic);
+	all->topic = NULL;
 	ao2_cleanup(all->topic_cached);
+	all->topic_cached = NULL;
 	ao2_cleanup(all->cache);
+	all->cache = NULL;
+	stasis_unsubscribe_and_join(all->forward_all_to_cached);
+	all->forward_all_to_cached = NULL;
 }
 
 struct stasis_cp_all *stasis_cp_all_create(const char *name,
@@ -76,8 +83,11 @@
 	all->topic = stasis_topic_create(name);
 	all->topic_cached = stasis_topic_create(cached_name);
 	all->cache = stasis_cache_create(id_fn);
+	all->forward_all_to_cached =
+		stasis_forward_all(all->topic, all->topic_cached);
 
-	if (!all->topic || !all->topic_cached || !all->cache) {
+	if (!all->topic || !all->topic_cached || !all->cache ||
+		!all->forward_all_to_cached) {
 		return NULL;
 	}
 
@@ -116,8 +126,8 @@
 
 	/* Should already be unsubscribed */
 	ast_assert(one->topic_cached == NULL);
-	ast_assert(one->forward == NULL);
-	ast_assert(one->forward_cached == NULL);
+	ast_assert(one->forward_topic_to_all == NULL);
+	ast_assert(one->forward_cached_to_all == NULL);
 
 	ao2_cleanup(one->topic);
 	one->topic = NULL;
@@ -142,13 +152,13 @@
 		return NULL;
 	}
 
-	one->forward = stasis_forward_all(one->topic, all->topic);
-	if (!one->forward) {
+	one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
+	if (!one->forward_topic_to_all) {
 		return NULL;
 	}
-	one->forward_cached = stasis_forward_all(
+	one->forward_cached_to_all = stasis_forward_all(
 		stasis_caching_get_topic(one->topic_cached), all->topic_cached);
-	if (!one->forward_cached) {
+	if (!one->forward_cached_to_all) {
 		return NULL;
 	}
 
@@ -164,10 +174,10 @@
 
 	stasis_caching_unsubscribe(one->topic_cached);
 	one->topic_cached = NULL;
-	stasis_unsubscribe(one->forward);
-	one->forward = NULL;
-	stasis_unsubscribe(one->forward_cached);
-	one->forward_cached = NULL;
+	stasis_unsubscribe(one->forward_topic_to_all);
+	one->forward_topic_to_all = NULL;
+	stasis_unsubscribe(one->forward_cached_to_all);
+	one->forward_cached_to_all = NULL;
 }
 
 struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)

Modified: team/dlee/cache-pattern-fix/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/tests/test_stasis.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/tests/test_stasis.c (original)
+++ team/dlee/cache-pattern-fix/tests/test_stasis.c Fri Aug  2 11:48:05 2013
@@ -610,7 +610,7 @@
 	return cachable->id;
 }
 
-AST_TEST_DEFINE(cache_passthrough)
+AST_TEST_DEFINE(cache_filter)
 {
 	RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -620,14 +620,13 @@
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
 	int actual_len;
-	struct stasis_message_type *actual_type;
-
-	switch (cmd) {
-	case TEST_INIT:
-		info->name = __func__;
-		info->category = test_category;
-		info->summary = "Test passing messages through cache topic unscathed.";
-		info->description = "Test passing messages through cache topic unscathed.";
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test caching topics only forward cache_update messages.";
+		info->description = "Test caching topics only forward cache_update messages.";
 		return AST_TEST_NOT_RUN;
 	case TEST_EXECUTE:
 		break;
@@ -652,13 +651,8 @@
 
 	stasis_publish(topic, test_message);
 
-	actual_len = consumer_wait_for(consumer, 1);
-	ast_test_validate(test, 1 == actual_len);
-
-	actual_type = stasis_message_type(consumer->messages_rxed[0]);
-	ast_test_validate(test, non_cache_type == actual_type);
-
-	ast_test_validate(test, test_message == consumer->messages_rxed[0]);
+	actual_len = consumer_should_stay(consumer, 0);
+	ast_test_validate(test, 0 == actual_len);
 
 	return AST_TEST_PASS;
 }
@@ -1113,8 +1107,9 @@
 	ast_test_validate(test, 1 == actual_len);
 	actual_len = consumer_wait_for(consumer2, 1);
 	ast_test_validate(test, 1 == actual_len);
-	actual_len = consumer_wait_for(consumer3, 1);
-	ast_test_validate(test, 1 == actual_len);
+	/* Uncacheable message should not be passed through */
+	actual_len = consumer_should_stay(consumer3, 0);
+	ast_test_validate(test, 0 == actual_len);
 
 	actual = consumer1->messages_rxed[0];
 	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
@@ -1127,9 +1122,6 @@
 	update = stasis_message_data(actual);
 	ast_test_validate(test, test_message_type2 == update->type);
 	ast_test_validate(test, test_message2 == update->new_snapshot);
-
-	actual = consumer3->messages_rxed[0];
-	ast_test_validate(test, test_message3 == actual);
 
 	/* consumer1 and consumer2 do not get the final message. */
 	ao2_cleanup(consumer1);
@@ -1287,7 +1279,7 @@
 	AST_TEST_UNREGISTER(publish);
 	AST_TEST_UNREGISTER(unsubscribe_stops_messages);
 	AST_TEST_UNREGISTER(forward);
-	AST_TEST_UNREGISTER(cache_passthrough);
+	AST_TEST_UNREGISTER(cache_filter);
 	AST_TEST_UNREGISTER(cache);
 	AST_TEST_UNREGISTER(cache_dump);
 	AST_TEST_UNREGISTER(route_conflicts);
@@ -1309,7 +1301,7 @@
 	AST_TEST_REGISTER(publish);
 	AST_TEST_REGISTER(unsubscribe_stops_messages);
 	AST_TEST_REGISTER(forward);
-	AST_TEST_REGISTER(cache_passthrough);
+	AST_TEST_REGISTER(cache_filter);
 	AST_TEST_REGISTER(cache);
 	AST_TEST_REGISTER(cache_dump);
 	AST_TEST_REGISTER(route_conflicts);




More information about the asterisk-commits mailing list