[asterisk-commits] dlee: branch dlee/stasis-router r383052 - in /team/dlee/stasis-router: includ...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Mar 14 09:06:28 CDT 2013
Author: dlee
Date: Thu Mar 14 09:06:24 2013
New Revision: 383052
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383052
Log:
Route + cleanup correctness; docs
Modified:
team/dlee/stasis-router/include/asterisk/stasis_message_router.h
team/dlee/stasis-router/main/stasis_message_router.c
team/dlee/stasis-router/tests/test_stasis.c
Modified: team/dlee/stasis-router/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/include/asterisk/stasis_message_router.h?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/stasis-router/include/asterisk/stasis_message_router.h Thu Mar 14 09:06:24 2013
@@ -27,11 +27,17 @@
* the subscription handler, it is much cleaner to specify a different callback
* for each message type. The \ref stasis_message_router is here to help!
*
- * Once constructed (using stasis_message_router_create()), the router can be
- * subscribed to any number of \ref stasis_topic's.
+ * A \ref stasis_message_router is constructed for a particular \ref
+ * stasis_topic, which is subscribes to. Call
+ * stasis_message_router_unsubscribe() to cancel that subscription.
*
- * Individual message handlers may be added using stasis_message_router_add(). A
- * default route may be added using stasis_message_router_add_default().
+ * Once constructed, routes can be added using stasis_message_router_add() (or
+ * stasis_message_router_set_default() for any messages not handled by other
+ * routes). There may be only one route per \ref stasis_message_type. The
+ * route's \a callback is invoked just as if it were a callback for a
+ * subscription; but it only gets called for messages of the specified type.
+ *
+ * \since 12
*/
#include "asterisk/stasis.h"
@@ -79,4 +85,5 @@
int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data);
+
#endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
Modified: team/dlee/stasis-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/main/stasis_message_router.c?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/main/stasis_message_router.c (original)
+++ team/dlee/stasis-router/main/stasis_message_router.c Thu Mar 14 09:06:24 2013
@@ -95,23 +95,6 @@
struct stasis_message_type *type = stasis_message_type(message);
size_t i;
- if (stasis_subscription_final_message(sub, message)) {
- ao2_lock(router);
-
- for (i = 0; i < router->num_routes_current; ++i) {
- router->routes[i]->callback(
- router->routes[i]->data, sub, topic, message);
- }
-
- router->default_route->callback(
- router->default_route->data, sub, topic, message);
-
- ao2_unlock(router);
-
- ao2_cleanup(router);
- return;
- }
-
{
SCOPED_AO2LOCK(lock, router);
@@ -138,6 +121,11 @@
route->callback(route->data, sub, topic, message);
}
+ if (stasis_subscription_final_message(sub, message)) {
+ ao2_cleanup(router);
+ return;
+ }
+
}
struct stasis_message_router *stasis_message_router_create(
@@ -202,7 +190,15 @@
struct stasis_message_route *route)
{
struct stasis_message_route **routes;
+ size_t i;
SCOPED_AO2LOCK(lock, router);
+
+ /* Check for route conflicts */
+ for (i = 0; i < router->num_routes_current; ++i) {
+ if (router->routes[i]->message_type == route->message_type) {
+ return -1;
+ }
+ }
/* Increase list size, if needed */
if (router->num_routes_current + 1 > router->num_routes_max) {
@@ -215,6 +211,7 @@
router->num_routes_max *= 2;
}
+
ao2_ref(route, +1);
router->routes[router->num_routes_current++] = route;
return 0;
Modified: team/dlee/stasis-router/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-router/tests/test_stasis.c?view=diff&rev=383052&r1=383051&r2=383052
==============================================================================
--- team/dlee/stasis-router/tests/test_stasis.c (original)
+++ team/dlee/stasis-router/tests/test_stasis.c Thu Mar 14 09:06:24 2013
@@ -677,6 +677,52 @@
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(route_conflicts)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ int ret;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary =
+ "Multiple routes to the same message_type should fail";
+ info->description =
+ "Multiple routes to the same message_type should fail";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+
+ test_message_type = stasis_message_type_create("TestMessage");
+ ast_test_validate(test, NULL != test_message_type);
+
+ uut = stasis_message_router_create(topic);
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add(
+ uut, test_message_type, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ret = stasis_message_router_add(
+ uut, test_message_type, consumer_exec, consumer2);
+ ast_test_validate(test, 0 != ret);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -725,10 +771,12 @@
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
- ret = stasis_message_router_add(uut, test_message_type1, consumer_exec, consumer1);
+ ret = stasis_message_router_add(
+ uut, test_message_type1, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer1, +1);
- ret = stasis_message_router_add(uut, test_message_type2, consumer_exec, consumer2);
+ ret = stasis_message_router_add(
+ uut, test_message_type2, consumer_exec, consumer2);
ast_test_validate(test, 0 == ret);
ao2_ref(consumer2, +1);
ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
@@ -745,28 +793,28 @@
ast_test_validate(test, NULL != test_message3);
stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
actual_len = consumer_wait_for(consumer1, 1);
ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
actual = consumer1->messages_rxed[0];
ast_test_validate(test, test_message1 == actual);
- ast_test_validate(test, 0 == consumer_should_stay(consumer2, 0));
- ast_test_validate(test, 0 == consumer_should_stay(consumer3, 0));
-
- stasis_publish(topic, test_message2);
- actual_len = consumer_wait_for(consumer2, 1);
- ast_test_validate(test, 1 == actual_len);
+
actual = consumer2->messages_rxed[0];
ast_test_validate(test, test_message2 == actual);
- ast_test_validate(test, 1 == consumer_should_stay(consumer1, 1));
- ast_test_validate(test, 0 == consumer_should_stay(consumer3, 0));
-
- stasis_publish(topic, test_message3);
- actual_len = consumer_wait_for(consumer3, 1);
- ast_test_validate(test, 1 == actual_len);
+
actual = consumer3->messages_rxed[0];
ast_test_validate(test, test_message3 == actual);
- ast_test_validate(test, 1 == consumer_should_stay(consumer1, 1));
- ast_test_validate(test, 1 == consumer_should_stay(consumer2, 1));
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
return AST_TEST_PASS;
}
@@ -781,6 +829,7 @@
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_passthrough);
AST_TEST_UNREGISTER(cache);
+ AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
return 0;
}
@@ -795,6 +844,7 @@
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_passthrough);
AST_TEST_REGISTER(cache);
+ AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
return AST_MODULE_LOAD_SUCCESS;
}
More information about the asterisk-commits
mailing list