[asterisk-commits] dlee: branch dlee/cache-pattern-fix r396134 - in /team/dlee/cache-pattern-fix...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Aug 2 11:48:07 CDT 2013
Author: dlee
Date: Fri Aug 2 11:48:05 2013
New Revision: 396134
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396134
Log:
Forward _all messages to _all_cached
Modified:
team/dlee/cache-pattern-fix/main/stasis_cache.c
team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
team/dlee/cache-pattern-fix/tests/test_stasis.c
Modified: team/dlee/cache-pattern-fix/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache.c Fri Aug 2 11:48:05 2013
@@ -426,8 +426,7 @@
id = caching_topic->cache->id_fn(message);
if (id == NULL) {
- /* Object isn't cached; forward */
- stasis_forward_message(caching_topic->topic, topic, message);
+ /* Object isn't cached; discard */
} else {
/* Update the cache */
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
Modified: team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c Fri Aug 2 11:48:05 2013
@@ -38,14 +38,16 @@
struct stasis_topic *topic;
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
+
+ struct stasis_subscription *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward;
- struct stasis_subscription *forward_cached;
+ struct stasis_subscription *forward_topic_to_all;
+ struct stasis_subscription *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -53,8 +55,13 @@
struct stasis_cp_all *all = obj;
ao2_cleanup(all->topic);
+ all->topic = NULL;
ao2_cleanup(all->topic_cached);
+ all->topic_cached = NULL;
ao2_cleanup(all->cache);
+ all->cache = NULL;
+ stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ all->forward_all_to_cached = NULL;
}
struct stasis_cp_all *stasis_cp_all_create(const char *name,
@@ -76,8 +83,11 @@
all->topic = stasis_topic_create(name);
all->topic_cached = stasis_topic_create(cached_name);
all->cache = stasis_cache_create(id_fn);
+ all->forward_all_to_cached =
+ stasis_forward_all(all->topic, all->topic_cached);
- if (!all->topic || !all->topic_cached || !all->cache) {
+ if (!all->topic || !all->topic_cached || !all->cache ||
+ !all->forward_all_to_cached) {
return NULL;
}
@@ -116,8 +126,8 @@
/* Should already be unsubscribed */
ast_assert(one->topic_cached == NULL);
- ast_assert(one->forward == NULL);
- ast_assert(one->forward_cached == NULL);
+ ast_assert(one->forward_topic_to_all == NULL);
+ ast_assert(one->forward_cached_to_all == NULL);
ao2_cleanup(one->topic);
one->topic = NULL;
@@ -142,13 +152,13 @@
return NULL;
}
- one->forward = stasis_forward_all(one->topic, all->topic);
- if (!one->forward) {
+ one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
+ if (!one->forward_topic_to_all) {
return NULL;
}
- one->forward_cached = stasis_forward_all(
+ one->forward_cached_to_all = stasis_forward_all(
stasis_caching_get_topic(one->topic_cached), all->topic_cached);
- if (!one->forward_cached) {
+ if (!one->forward_cached_to_all) {
return NULL;
}
@@ -164,10 +174,10 @@
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
- stasis_unsubscribe(one->forward);
- one->forward = NULL;
- stasis_unsubscribe(one->forward_cached);
- one->forward_cached = NULL;
+ stasis_unsubscribe(one->forward_topic_to_all);
+ one->forward_topic_to_all = NULL;
+ stasis_unsubscribe(one->forward_cached_to_all);
+ one->forward_cached_to_all = NULL;
}
struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)
Modified: team/dlee/cache-pattern-fix/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/tests/test_stasis.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/tests/test_stasis.c (original)
+++ team/dlee/cache-pattern-fix/tests/test_stasis.c Fri Aug 2 11:48:05 2013
@@ -610,7 +610,7 @@
return cachable->id;
}
-AST_TEST_DEFINE(cache_passthrough)
+AST_TEST_DEFINE(cache_filter)
{
RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -620,14 +620,13 @@
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
- struct stasis_message_type *actual_type;
-
- switch (cmd) {
- case TEST_INIT:
- info->name = __func__;
- info->category = test_category;
- info->summary = "Test passing messages through cache topic unscathed.";
- info->description = "Test passing messages through cache topic unscathed.";
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test caching topics only forward cache_update messages.";
+ info->description = "Test caching topics only forward cache_update messages.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
@@ -652,13 +651,8 @@
stasis_publish(topic, test_message);
- actual_len = consumer_wait_for(consumer, 1);
- ast_test_validate(test, 1 == actual_len);
-
- actual_type = stasis_message_type(consumer->messages_rxed[0]);
- ast_test_validate(test, non_cache_type == actual_type);
-
- ast_test_validate(test, test_message == consumer->messages_rxed[0]);
+ actual_len = consumer_should_stay(consumer, 0);
+ ast_test_validate(test, 0 == actual_len);
return AST_TEST_PASS;
}
@@ -1113,8 +1107,9 @@
ast_test_validate(test, 1 == actual_len);
actual_len = consumer_wait_for(consumer2, 1);
ast_test_validate(test, 1 == actual_len);
- actual_len = consumer_wait_for(consumer3, 1);
- ast_test_validate(test, 1 == actual_len);
+ /* Uncacheable message should not be passed through */
+ actual_len = consumer_should_stay(consumer3, 0);
+ ast_test_validate(test, 0 == actual_len);
actual = consumer1->messages_rxed[0];
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
@@ -1127,9 +1122,6 @@
update = stasis_message_data(actual);
ast_test_validate(test, test_message_type2 == update->type);
ast_test_validate(test, test_message2 == update->new_snapshot);
-
- actual = consumer3->messages_rxed[0];
- ast_test_validate(test, test_message3 == actual);
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup(consumer1);
@@ -1287,7 +1279,7 @@
AST_TEST_UNREGISTER(publish);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
- AST_TEST_UNREGISTER(cache_passthrough);
+ AST_TEST_UNREGISTER(cache_filter);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
@@ -1309,7 +1301,7 @@
AST_TEST_REGISTER(publish);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
- AST_TEST_REGISTER(cache_passthrough);
+ AST_TEST_REGISTER(cache_filter);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
More information about the asterisk-commits
mailing list