[asterisk-commits] dlee: branch dlee/stasis-router r383052 - in /team/dlee/stasis-router: includ...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Mar 14 09:06:28 CDT 2013


Author: dlee
Date: Thu Mar 14 09:06:24 2013
New Revision: 383052

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383052
Log:
Route + cleanup correctness; docs

Modified:
    team/dlee/stasis-router/include/asterisk/stasis_message_router.h
    team/dlee/stasis-router/main/stasis_message_router.c
    team/dlee/stasis-router/tests/test_stasis.c

Modified: team/dlee/stasis-router/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/include/asterisk/stasis_message_router.h?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/stasis-router/include/asterisk/stasis_message_router.h Thu Mar 14 09:06:24 2013
@@ -27,11 +27,17 @@
  * the subscription handler, it is much cleaner to specify a different callback
  * for each message type. The \ref stasis_message_router is here to help!
  *
- * Once constructed (using stasis_message_router_create()), the router can be
- * subscribed to any number of \ref stasis_topic's.
+ * A \ref stasis_message_router is constructed for a particular \ref
+ * stasis_topic, which is subscribes to. Call
+ * stasis_message_router_unsubscribe() to cancel that subscription.
  *
- * Individual message handlers may be added using stasis_message_router_add(). A
- * default route may be added using stasis_message_router_add_default().
+ * Once constructed, routes can be added using stasis_message_router_add() (or
+ * stasis_message_router_set_default() for any messages not handled by other
+ * routes). There may be only one route per \ref stasis_message_type. The
+ * route's \a callback is invoked just as if it were a callback for a
+ * subscription; but it only gets called for messages of the specified type.
+ *
+ * \since 12
  */
 
 #include "asterisk/stasis.h"
@@ -79,4 +85,5 @@
 int stasis_message_router_set_default(struct stasis_message_router *router,
 				      stasis_subscription_cb callback,
 				      void *data);
+
 #endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */

Modified: team/dlee/stasis-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/main/stasis_message_router.c?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/main/stasis_message_router.c (original)
+++ team/dlee/stasis-router/main/stasis_message_router.c Thu Mar 14 09:06:24 2013
@@ -95,23 +95,6 @@
 	struct stasis_message_type *type = stasis_message_type(message);
 	size_t i;
 
-	if (stasis_subscription_final_message(sub, message)) {
-		ao2_lock(router);
-
-		for (i = 0; i < router->num_routes_current; ++i) {
-			router->routes[i]->callback(
-				router->routes[i]->data, sub, topic, message);
-		}
-
-		router->default_route->callback(
-			router->default_route->data, sub, topic, message);
-
-		ao2_unlock(router);
-
-		ao2_cleanup(router);
-		return;
-	}
-
 	{
 		SCOPED_AO2LOCK(lock, router);
 
@@ -138,6 +121,11 @@
 		route->callback(route->data, sub, topic, message);
 	}
 
+	if (stasis_subscription_final_message(sub, message)) {
+		ao2_cleanup(router);
+		return;
+	}
+
 }
 
 struct stasis_message_router *stasis_message_router_create(
@@ -202,7 +190,15 @@
 		     struct stasis_message_route *route)
 {
 	struct stasis_message_route **routes;
+	size_t i;
 	SCOPED_AO2LOCK(lock, router);
+
+	/* Check for route conflicts */
+	for (i = 0; i < router->num_routes_current; ++i) {
+		if (router->routes[i]->message_type == route->message_type) {
+			return -1;
+		}
+	}
 
 	/* Increase list size, if needed */
 	if (router->num_routes_current + 1 > router->num_routes_max) {
@@ -215,6 +211,7 @@
 		router->num_routes_max *= 2;
 	}
 
+
 	ao2_ref(route, +1);
 	router->routes[router->num_routes_current++] = route;
 	return 0;

Modified: team/dlee/stasis-router/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/tests/test_stasis.c?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/tests/test_stasis.c (original)
+++ team/dlee/stasis-router/tests/test_stasis.c Thu Mar 14 09:06:24 2013
@@ -677,6 +677,52 @@
 	return AST_TEST_PASS;
 }
 
+AST_TEST_DEFINE(route_conflicts)
+{
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+	int ret;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary =
+			"Multiple routes to the same message_type should fail";
+		info->description =
+			"Multiple routes to the same message_type should fail";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	topic = stasis_topic_create("TestTopic");
+	ast_test_validate(test, NULL != topic);
+
+	consumer1 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer1);
+	consumer2 = consumer_create(1);
+	ast_test_validate(test, NULL != consumer2);
+
+	test_message_type = stasis_message_type_create("TestMessage");
+	ast_test_validate(test, NULL != test_message_type);
+
+	uut = stasis_message_router_create(topic);
+	ast_test_validate(test, NULL != uut);
+
+	ret = stasis_message_router_add(
+		uut, test_message_type, consumer_exec, consumer1);
+	ast_test_validate(test, 0 == ret);
+	ret = stasis_message_router_add(
+		uut, test_message_type, consumer_exec, consumer2);
+	ast_test_validate(test, 0 != ret);
+
+	return AST_TEST_PASS;
+}
+
 AST_TEST_DEFINE(router)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -725,10 +771,12 @@
 	uut = stasis_message_router_create(topic);
 	ast_test_validate(test, NULL != uut);
 
-	ret = stasis_message_router_add(uut, test_message_type1, consumer_exec, consumer1);
+	ret = stasis_message_router_add(
+		uut, test_message_type1, consumer_exec, consumer1);
 	ast_test_validate(test, 0 == ret);
 	ao2_ref(consumer1, +1);
-	ret = stasis_message_router_add(uut, test_message_type2, consumer_exec, consumer2);
+	ret = stasis_message_router_add(
+		uut, test_message_type2, consumer_exec, consumer2);
 	ast_test_validate(test, 0 == ret);
 	ao2_ref(consumer2, +1);
 	ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
@@ -745,28 +793,28 @@
 	ast_test_validate(test, NULL != test_message3);
 
 	stasis_publish(topic, test_message1);
+	stasis_publish(topic, test_message2);
+	stasis_publish(topic, test_message3);
+
 	actual_len = consumer_wait_for(consumer1, 1);
 	ast_test_validate(test, 1 == actual_len);
+	actual_len = consumer_wait_for(consumer2, 1);
+	ast_test_validate(test, 1 == actual_len);
+	actual_len = consumer_wait_for(consumer3, 1);
+	ast_test_validate(test, 1 == actual_len);
+
 	actual = consumer1->messages_rxed[0];
 	ast_test_validate(test, test_message1 == actual);
-	ast_test_validate(test, 0 == consumer_should_stay(consumer2, 0));
-	ast_test_validate(test, 0 == consumer_should_stay(consumer3, 0));
-
-	stasis_publish(topic, test_message2);
-	actual_len = consumer_wait_for(consumer2, 1);
-	ast_test_validate(test, 1 == actual_len);
+
 	actual = consumer2->messages_rxed[0];
 	ast_test_validate(test, test_message2 == actual);
-	ast_test_validate(test, 1 == consumer_should_stay(consumer1, 1));
-	ast_test_validate(test, 0 == consumer_should_stay(consumer3, 0));
-
-	stasis_publish(topic, test_message3);
-	actual_len = consumer_wait_for(consumer3, 1);
-	ast_test_validate(test, 1 == actual_len);
+
 	actual = consumer3->messages_rxed[0];
 	ast_test_validate(test, test_message3 == actual);
-	ast_test_validate(test, 1 == consumer_should_stay(consumer1, 1));
-	ast_test_validate(test, 1 == consumer_should_stay(consumer2, 1));
+
+	/* consumer1 and consumer2 do not get the final message. */
+	ao2_cleanup(consumer1);
+	ao2_cleanup(consumer2);
 
 	return AST_TEST_PASS;
 }
@@ -781,6 +829,7 @@
 	AST_TEST_UNREGISTER(forward);
 	AST_TEST_UNREGISTER(cache_passthrough);
 	AST_TEST_UNREGISTER(cache);
+	AST_TEST_UNREGISTER(route_conflicts);
 	AST_TEST_UNREGISTER(router);
 	return 0;
 }
@@ -795,6 +844,7 @@
 	AST_TEST_REGISTER(forward);
 	AST_TEST_REGISTER(cache_passthrough);
 	AST_TEST_REGISTER(cache);
+	AST_TEST_REGISTER(route_conflicts);
 	AST_TEST_REGISTER(router);
 	return AST_MODULE_LOAD_SUCCESS;
 }




More information about the asterisk-commits mailing list