[svn-commits] dlee: branch dlee/cache-router r394390 - in /team/dlee/cache-router: main/ te...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Mon Jul 15 13:38:14 CDT 2013
Author: dlee
Date: Mon Jul 15 13:38:12 2013
New Revision: 394390
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=394390
Log:
Tests; use the new update router
Modified:
team/dlee/cache-router/main/cdr.c
team/dlee/cache-router/main/manager_bridging.c
team/dlee/cache-router/main/manager_channels.c
team/dlee/cache-router/main/stasis_message_router.c
team/dlee/cache-router/tests/test_stasis.c
Modified: team/dlee/cache-router/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/cdr.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/cdr.c (original)
+++ team/dlee/cache-router/main/cdr.c Mon Jul 15 13:38:12 2013
@@ -1971,9 +1971,7 @@
struct cdr_object *it_cdr;
ast_assert(update != NULL);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -4022,7 +4020,7 @@
if (!stasis_router) {
return -1;
}
- stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
Modified: team/dlee/cache-router/main/manager_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/manager_bridging.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/manager_bridging.c (original)
+++ team/dlee/cache-router/main/manager_bridging.c Mon Jul 15 13:38:12 2013
@@ -195,9 +195,7 @@
update = stasis_message_data(message);
- if (ast_bridge_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_bridge_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -468,35 +466,22 @@
return -1;
}
- /* BUGBUG - This should really route off of the manager_router, but
- * can't b/c manager_channels is already routing the
- * stasis_cache_update_type() messages. Having a separate router can
- * cause some message ordering issues with bridge and channel messages.
- */
- bridge_state_router = stasis_message_router_create(bridge_topic);
+ bridge_state_router = ast_manager_get_message_router();
if (!bridge_state_router) {
return -1;
}
+ ret |= stasis_message_router_add_cache_update(bridge_state_router,
+ ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
+
ret |= stasis_message_router_add(bridge_state_router,
- stasis_cache_update_type(),
- bridge_snapshot_update,
- NULL);
+ ast_bridge_merge_message_type(), bridge_merge_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_bridge_merge_message_type(),
- bridge_merge_cb,
- NULL);
+ ast_channel_entered_bridge_type(), channel_enter_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_entered_bridge_type(),
- channel_enter_cb,
- NULL);
-
- ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_left_bridge_type(),
- channel_leave_cb,
- NULL);
+ ast_channel_left_bridge_type(), channel_leave_cb, NULL);
ret |= ast_manager_register_xml_core("BridgeList", 0, manager_bridges_list);
ret |= ast_manager_register_xml_core("BridgeInfo", 0, manager_bridge_info);
Modified: team/dlee/cache-router/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/manager_channels.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/manager_channels.c (original)
+++ team/dlee/cache-router/main/manager_channels.c Mon Jul 15 13:38:12 2013
@@ -720,9 +720,7 @@
update = stasis_message_data(message);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -1271,85 +1269,57 @@
ast_register_atexit(manager_channels_shutdown);
- ret |= stasis_message_router_add(message_router,
- stasis_cache_update_type(),
- channel_snapshot_update,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_user_event_type(),
- channel_user_event_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_begin_type(),
- channel_dtmf_begin_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_end_type(),
- channel_dtmf_end_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_request_type(),
- channel_hangup_request_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_dial_type(),
- channel_dial_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_hold_type(),
- channel_hold_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_unhold_type(),
- channel_unhold_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_fax_type(),
- channel_fax_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_start_type(),
- channel_chanspy_start_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_stop_type(),
- channel_chanspy_stop_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_handler_type(),
- channel_hangup_handler_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_moh_start_type(),
- channel_moh_start_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_moh_stop_type(),
- channel_moh_stop_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_start_type(),
- channel_monitor_start_cb,
- NULL);
-
- ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_stop_type(),
- channel_monitor_stop_cb,
- NULL);
+ ret |= stasis_message_router_add_cache_update(message_router,
+ ast_channel_snapshot_type(), channel_snapshot_update, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_user_event_type(), channel_user_event_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_hangup_request_type(), channel_hangup_request_cb,
+ NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_dial_type(), channel_dial_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_hold_type(), channel_hold_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_unhold_type(), channel_unhold_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_fax_type(), channel_fax_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_chanspy_start_type(), channel_chanspy_start_cb,
+ NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_hangup_handler_type(), channel_hangup_handler_cb,
+ NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_moh_start_type(), channel_moh_start_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_monitor_start_type(), channel_monitor_start_cb,
+ NULL);
+
+ ret |= stasis_message_router_add(message_router,
+ ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL);
/* If somehow we failed to add any routes, just shut down the whole
* thing and fail it.
Modified: team/dlee/cache-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/stasis_message_router.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/stasis_message_router.c (original)
+++ team/dlee/cache-router/main/stasis_message_router.c Mon Jul 15 13:38:12 2013
@@ -114,7 +114,7 @@
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
- route = ao2_find(router->routes, update->type, OBJ_KEY);
+ route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
}
if (route == NULL) {
Modified: team/dlee/cache-router/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/tests/test_stasis.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/tests/test_stasis.c (original)
+++ team/dlee/cache-router/tests/test_stasis.c Mon Jul 15 13:38:12 2013
@@ -230,7 +230,7 @@
{
struct timeval start = ast_tvnow();
struct timespec end = {
- .tv_sec = start.tv_sec + 30,
+ .tv_sec = start.tv_sec + 3,
.tv_nsec = start.tv_usec * 1000
};
@@ -867,7 +867,7 @@
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
@@ -913,7 +913,7 @@
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
@@ -1006,6 +1006,126 @@
return AST_TEST_PASS;
}
+static const char *cache_simple(struct stasis_message *message) {
+ const char *type_name =
+ stasis_message_type_name(stasis_message_type(message));
+ if (!ast_begins_with(type_name, "Cache")) {
+ return NULL;
+ }
+
+ return "cached";
+}
+
+AST_TEST_DEFINE(router_cache_updates)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
+ struct stasis_cache_update *update;
+ int actual_len, ret;
+ struct stasis_message *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test special handling cache_update messages";
+ info->description = "Test special handling cache_update messages";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ caching_topic = stasis_caching_topic_create(topic, cache_simple);
+ ast_test_validate(test, NULL != caching_topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+ consumer3 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer3);
+
+ test_message_type1 = stasis_message_type_create("Cache1", NULL);
+ ast_test_validate(test, NULL != test_message_type1);
+ test_message_type2 = stasis_message_type_create("Cache2", NULL);
+ ast_test_validate(test, NULL != test_message_type2);
+ test_message_type3 = stasis_message_type_create("NonCache", NULL);
+ ast_test_validate(test, NULL != test_message_type3);
+
+ uut = stasis_message_router_create(
+ stasis_caching_get_topic(caching_topic));
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add_cache_update(
+ uut, test_message_type1, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer1, +1);
+ ret = stasis_message_router_add(
+ uut, stasis_cache_update_type(), consumer_exec, consumer2);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer2, +1);
+ ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer3, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message1 = stasis_message_create(test_message_type1, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type2, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type3, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual = consumer1->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type1 == update->type);
+ ast_test_validate(test, test_message1 == update->new_snapshot);
+
+ actual = consumer2->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type2 == update->type);
+ ast_test_validate(test, test_message2 == update->new_snapshot);
+
+ actual = consumer3->messages_rxed[0];
+ ast_test_validate(test, test_message3 == actual);
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(no_to_json)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
@@ -1160,6 +1280,7 @@
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
@@ -1181,6 +1302,7 @@
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);
More information about the svn-commits
mailing list