[svn-commits] dlee: branch dlee/cache-router r394390 - in /team/dlee/cache-router: main/ te...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Jul 15 13:38:14 CDT 2013


Author: dlee
Date: Mon Jul 15 13:38:12 2013
New Revision: 394390

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=394390
Log:
Tests; use the new update router

Modified:
    team/dlee/cache-router/main/cdr.c
    team/dlee/cache-router/main/manager_bridging.c
    team/dlee/cache-router/main/manager_channels.c
    team/dlee/cache-router/main/stasis_message_router.c
    team/dlee/cache-router/tests/test_stasis.c

Modified: team/dlee/cache-router/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/cdr.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/cdr.c (original)
+++ team/dlee/cache-router/main/cdr.c Mon Jul 15 13:38:12 2013
@@ -1971,9 +1971,7 @@
 	struct cdr_object *it_cdr;
 
 	ast_assert(update != NULL);
-	if (ast_channel_snapshot_type() != update->type) {
-		return;
-	}
+	ast_assert(ast_channel_snapshot_type() == update->type);
 
 	old_snapshot = stasis_message_data(update->old_snapshot);
 	new_snapshot = stasis_message_data(update->new_snapshot);
@@ -4022,7 +4020,7 @@
 	if (!stasis_router) {
 		return -1;
 	}
-	stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_channel_cache_message, NULL);
+	stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);

Modified: team/dlee/cache-router/main/manager_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/manager_bridging.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/manager_bridging.c (original)
+++ team/dlee/cache-router/main/manager_bridging.c Mon Jul 15 13:38:12 2013
@@ -195,9 +195,7 @@
 
 	update = stasis_message_data(message);
 
-	if (ast_bridge_snapshot_type() != update->type) {
-		return;
-	}
+	ast_assert(ast_bridge_snapshot_type() == update->type);
 
 	old_snapshot = stasis_message_data(update->old_snapshot);
 	new_snapshot = stasis_message_data(update->new_snapshot);
@@ -468,35 +466,22 @@
 		return -1;
 	}
 
-	/* BUGBUG - This should really route off of the manager_router, but
-	 * can't b/c manager_channels is already routing the
-	 * stasis_cache_update_type() messages. Having a separate router can
-	 * cause some message ordering issues with bridge and channel messages.
-	 */
-	bridge_state_router = stasis_message_router_create(bridge_topic);
+	bridge_state_router = ast_manager_get_message_router();
 	if (!bridge_state_router) {
 		return -1;
 	}
 
+	ret |= stasis_message_router_add_cache_update(bridge_state_router,
+		ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
+
 	ret |= stasis_message_router_add(bridge_state_router,
-					 stasis_cache_update_type(),
-					 bridge_snapshot_update,
-					 NULL);
+		ast_bridge_merge_message_type(), bridge_merge_cb, NULL);
 
 	ret |= stasis_message_router_add(bridge_state_router,
-					 ast_bridge_merge_message_type(),
-					 bridge_merge_cb,
-					 NULL);
+		ast_channel_entered_bridge_type(), channel_enter_cb, NULL);
 
 	ret |= stasis_message_router_add(bridge_state_router,
-					 ast_channel_entered_bridge_type(),
-					 channel_enter_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(bridge_state_router,
-					 ast_channel_left_bridge_type(),
-					 channel_leave_cb,
-					 NULL);
+		ast_channel_left_bridge_type(), channel_leave_cb, NULL);
 
 	ret |= ast_manager_register_xml_core("BridgeList", 0, manager_bridges_list);
 	ret |= ast_manager_register_xml_core("BridgeInfo", 0, manager_bridge_info);

Modified: team/dlee/cache-router/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/manager_channels.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/manager_channels.c (original)
+++ team/dlee/cache-router/main/manager_channels.c Mon Jul 15 13:38:12 2013
@@ -720,9 +720,7 @@
 
 	update = stasis_message_data(message);
 
-	if (ast_channel_snapshot_type() != update->type) {
-		return;
-	}
+	ast_assert(ast_channel_snapshot_type() == update->type);
 
 	old_snapshot = stasis_message_data(update->old_snapshot);
 	new_snapshot = stasis_message_data(update->new_snapshot);
@@ -1271,85 +1269,57 @@
 
 	ast_register_atexit(manager_channels_shutdown);
 
-	ret |= stasis_message_router_add(message_router,
-					 stasis_cache_update_type(),
-					 channel_snapshot_update,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_user_event_type(),
-					 channel_user_event_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_dtmf_begin_type(),
-					 channel_dtmf_begin_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_dtmf_end_type(),
-					 channel_dtmf_end_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_hangup_request_type(),
-					 channel_hangup_request_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_dial_type(),
-					 channel_dial_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_hold_type(),
-					 channel_hold_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_unhold_type(),
-					 channel_unhold_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_fax_type(),
-					 channel_fax_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_chanspy_start_type(),
-					 channel_chanspy_start_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_chanspy_stop_type(),
-					 channel_chanspy_stop_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_hangup_handler_type(),
-					 channel_hangup_handler_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_moh_start_type(),
-					 channel_moh_start_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_moh_stop_type(),
-					 channel_moh_stop_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_monitor_start_type(),
-					 channel_monitor_start_cb,
-					 NULL);
-
-	ret |= stasis_message_router_add(message_router,
-					 ast_channel_monitor_stop_type(),
-					 channel_monitor_stop_cb,
-					 NULL);
+	ret |= stasis_message_router_add_cache_update(message_router,
+		ast_channel_snapshot_type(), channel_snapshot_update, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_user_event_type(), channel_user_event_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_hangup_request_type(), channel_hangup_request_cb,
+		NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_dial_type(), channel_dial_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_hold_type(), channel_hold_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_unhold_type(), channel_unhold_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_fax_type(), channel_fax_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_chanspy_start_type(), channel_chanspy_start_cb,
+		NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_hangup_handler_type(), channel_hangup_handler_cb,
+		NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_moh_start_type(), channel_moh_start_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_monitor_start_type(), channel_monitor_start_cb,
+		NULL);
+
+	ret |= stasis_message_router_add(message_router,
+		ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL);
 
 	/* If somehow we failed to add any routes, just shut down the whole
 	 * thing and fail it.

Modified: team/dlee/cache-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/stasis_message_router.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/main/stasis_message_router.c (original)
+++ team/dlee/cache-router/main/stasis_message_router.c Mon Jul 15 13:38:12 2013
@@ -114,7 +114,7 @@
 		/* Find a cache route */
 		struct stasis_cache_update *update =
 			stasis_message_data(message);
-		route = ao2_find(router->routes, update->type, OBJ_KEY);
+		route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
 	}
 
 	if (route == NULL) {

Modified: team/dlee/cache-router/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/tests/test_stasis.c?view=diff&rev=394390&r1=394389&r2=394390
==============================================================================
--- team/dlee/cache-router/tests/test_stasis.c (original)
+++ team/dlee/cache-router/tests/test_stasis.c Mon Jul 15 13:38:12 2013
@@ -230,7 +230,7 @@
 {
 	struct timeval start = ast_tvnow();
 	struct timespec end = {
-		.tv_sec = start.tv_sec + 30,
+		.tv_sec = start.tv_sec + 3,
 		.tv_nsec = start.tv_usec * 1000
 	};
 
@@ -867,7 +867,7 @@
 AST_TEST_DEFINE(route_conflicts)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
 	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
@@ -913,7 +913,7 @@
 AST_TEST_DEFINE(router)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
 	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
@@ -1006,6 +1006,126 @@
 	return AST_TEST_PASS;
 }
 
+static const char *cache_simple(struct stasis_message *message) {
+	const char *type_name =
+		stasis_message_type_name(stasis_message_type(message));
+	if (!ast_begins_with(type_name, "Cache")) {
+		return NULL;
+	}
+
+	return "cached";
+}
+
+AST_TEST_DEFINE(router_cache_updates)
+{
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+	RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
+	struct stasis_cache_update *update;
+	int actual_len, ret;
+	struct stasis_message *actual;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test special handling cache_update messages";
+		info->description = "Test special handling cache_update messages";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	topic = stasis_topic_create("TestTopic");
+	ast_test_validate(test, NULL != topic);
+
+	caching_topic = stasis_caching_topic_create(topic, cache_simple);
+	ast_test_validate(test, NULL != caching_topic);
+
+	consumer1 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer1);
+	consumer2 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer2);
+	consumer3 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer3);
+
+	test_message_type1 = stasis_message_type_create("Cache1", NULL);
+	ast_test_validate(test, NULL != test_message_type1);
+	test_message_type2 = stasis_message_type_create("Cache2", NULL);
+	ast_test_validate(test, NULL != test_message_type2);
+	test_message_type3 = stasis_message_type_create("NonCache", NULL);
+	ast_test_validate(test, NULL != test_message_type3);
+
+	uut = stasis_message_router_create(
+		stasis_caching_get_topic(caching_topic));
+	ast_test_validate(test, NULL != uut);
+
+	ret = stasis_message_router_add_cache_update(
+		uut, test_message_type1, consumer_exec, consumer1);
+	ast_test_validate(test, 0 == ret);
+	ao2_ref(consumer1, +1);
+	ret = stasis_message_router_add(
+		uut, stasis_cache_update_type(), consumer_exec, consumer2);
+	ast_test_validate(test, 0 == ret);
+	ao2_ref(consumer2, +1);
+	ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+	ast_test_validate(test, 0 == ret);
+	ao2_ref(consumer3, +1);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+	test_message1 = stasis_message_create(test_message_type1, test_data);
+	ast_test_validate(test, NULL != test_message1);
+	test_message2 = stasis_message_create(test_message_type2, test_data);
+	ast_test_validate(test, NULL != test_message2);
+	test_message3 = stasis_message_create(test_message_type3, test_data);
+	ast_test_validate(test, NULL != test_message3);
+
+	stasis_publish(topic, test_message1);
+	stasis_publish(topic, test_message2);
+	stasis_publish(topic, test_message3);
+
+	actual_len = consumer_wait_for(consumer1, 1);
+	ast_test_validate(test, 1 == actual_len);
+	actual_len = consumer_wait_for(consumer2, 1);
+	ast_test_validate(test, 1 == actual_len);
+	actual_len = consumer_wait_for(consumer3, 1);
+	ast_test_validate(test, 1 == actual_len);
+
+	actual = consumer1->messages_rxed[0];
+	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+	update = stasis_message_data(actual);
+	ast_test_validate(test, test_message_type1 == update->type);
+	ast_test_validate(test, test_message1 == update->new_snapshot);
+
+	actual = consumer2->messages_rxed[0];
+	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+	update = stasis_message_data(actual);
+	ast_test_validate(test, test_message_type2 == update->type);
+	ast_test_validate(test, test_message2 == update->new_snapshot);
+
+	actual = consumer3->messages_rxed[0];
+	ast_test_validate(test, test_message3 == actual);
+
+	/* consumer1 and consumer2 do not get the final message. */
+	ao2_cleanup(consumer1);
+	ao2_cleanup(consumer2);
+
+	return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(no_to_json)
 {
 	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
@@ -1160,6 +1280,7 @@
 	AST_TEST_UNREGISTER(cache_dump);
 	AST_TEST_UNREGISTER(route_conflicts);
 	AST_TEST_UNREGISTER(router);
+	AST_TEST_UNREGISTER(router_cache_updates);
 	AST_TEST_UNREGISTER(interleaving);
 	AST_TEST_UNREGISTER(no_to_json);
 	AST_TEST_UNREGISTER(to_json);
@@ -1181,6 +1302,7 @@
 	AST_TEST_REGISTER(cache_dump);
 	AST_TEST_REGISTER(route_conflicts);
 	AST_TEST_REGISTER(router);
+	AST_TEST_REGISTER(router_cache_updates);
 	AST_TEST_REGISTER(interleaving);
 	AST_TEST_REGISTER(no_to_json);
 	AST_TEST_REGISTER(to_json);




More information about the svn-commits mailing list