[asterisk-commits] dlee: branch dlee/cache-router r394388 - in /team/dlee/cache-router: include/...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Jul 15 10:53:57 CDT 2013
Author: dlee
Date: Mon Jul 15 10:53:55 2013
New Revision: 394388
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=394388
Log:
Cache routing
Modified:
team/dlee/cache-router/include/asterisk/stasis_message_router.h
team/dlee/cache-router/main/stasis_message_router.c
Modified: team/dlee/cache-router/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/include/asterisk/stasis_message_router.h?view=diff&rev=394388&r1=394387&r2=394388
==============================================================================
--- team/dlee/cache-router/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/cache-router/include/asterisk/stasis_message_router.h Mon Jul 15 10:53:55 2013
@@ -95,6 +95,11 @@
/*!
* \brief Add a route to a message router.
*
+ * A particular \a message_type may have at most one route per \a router. If
+ * you route \ref stasis_cache_update messages, the callback will only receive
+ * updates for types not handled by routes added with
+ * stasis_message_router_add_cache_update().
+ *
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
@@ -106,9 +111,29 @@
* \since 12
*/
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data);
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data);
+
+/*!
+ * \brief Add a route for \ref stasis_cache_update messages to a message router.
+ *
+ * A particular \a message_type may have at most one cache route per \a router.
+ * These are distinct from regular routes, so one could have both a regular
+ * route and a cache route for the same \a message_type.
+ *
+ * \param router Router to add the route to.
+ * \param message_type Subtype of cache update to route.
+ * \param callback Callback to forard messages of \a message_type to.
+ * \param data Data pointer to pass to \a callback.
+ *
+ * \retval 0 on success
+ * \retval -1 on failure
+ *
+ * \since 12
+ */
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data);
/*!
* \brief Remove a route from a message router.
@@ -119,7 +144,19 @@
* \since 12
*/
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type);
+ struct stasis_message_type *message_type);
+
+/*!
+ * \brief Remove a cache route from a message router.
+ *
+ * \param router Router to remove the route from.
+ * \param message_type Type of message to route.
+ *
+ * \since 12
+ */
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type);
/*!
* \brief Sets the default route of a router.
Modified: team/dlee/cache-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/stasis_message_router.c?view=diff&rev=394388&r1=394387&r2=394388
==============================================================================
--- team/dlee/cache-router/main/stasis_message_router.c (original)
+++ team/dlee/cache-router/main/stasis_message_router.c Mon Jul 15 10:53:55 2013
@@ -33,6 +33,9 @@
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
+
+/*! Number of hash buckets for the route table. Keep it prime! */
+#define ROUTE_TABLE_BUCKETS 7
/*! \internal */
struct stasis_message_route {
@@ -75,6 +78,8 @@
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
+ /*! Subscribed routes for \ref stasi_cache_update messages */
+ struct ao2_container *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
};
@@ -90,13 +95,46 @@
ao2_cleanup(router->routes);
router->routes = NULL;
+ ao2_cleanup(router->cache_routes);
+ router->cache_routes = NULL;
+
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
-static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type)
-{
- return ao2_find(router->routes, message_type, OBJ_KEY);
+static struct stasis_message_route *find_route(
+ struct stasis_message_router *router,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_type *type = stasis_message_type(message);
+ SCOPED_AO2LOCK(lock, router);
+
+ if (type == stasis_cache_update_type()) {
+ /* Find a cache route */
+ struct stasis_cache_update *update =
+ stasis_message_data(message);
+ route = ao2_find(router->routes, update->type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Find a regular route */
+ route = ao2_find(router->routes, type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Maybe the default route, then? */
+ if ((route = router->default_route)) {
+ ao2_ref(route, +1);
+ }
+ }
+
+ if (route == NULL) {
+ return NULL;
+ }
+
+ ao2_ref(route, +1);
+ return route;
}
static void router_dispatch(void *data,
@@ -105,29 +143,18 @@
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
- struct stasis_message_type *type = stasis_message_type(message);
-
- {
- SCOPED_AO2LOCK(lock, router);
-
- if (!(route = find_route(router, type))) {
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
- }
- }
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+ route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
}
+
if (stasis_subscription_final_message(sub, message)) {
- router_needs_cleanup = router;
- return;
- }
-
+ ao2_cleanup(router);
+ }
}
struct stasis_message_router *stasis_message_router_create(
@@ -140,7 +167,15 @@
return NULL;
}
- if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) {
+ router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
+ route_cmp);
+ if (!router->routes) {
+ return NULL;
+ }
+
+ router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
+ route_hash, route_cmp);
+ if (!router->cache_routes) {
return NULL;
}
@@ -211,7 +246,10 @@
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
- if ((existing_route = find_route(router, route->message_type))) {
+ existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
@@ -219,10 +257,27 @@
return 0;
}
+static int add_cache_route(struct stasis_message_router *router,
+ struct stasis_message_route *route)
+{
+ RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, router);
+
+ existing_route = ao2_find(router->cache_routes, route->message_type,
+ OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
+ return -1;
+ }
+
+ ao2_link(router->cache_routes, route);
+ return 0;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
@@ -234,12 +289,37 @@
return add_route(router, route);
}
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
+{
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+ route = route_create(message_type, callback, data);
+ if (!route) {
+ return -1;
+ }
+
+ return add_cache_route(router, route);
+}
+
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type)
-{
- SCOPED_AO2LOCK(lock, router);
-
- ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ struct stasis_message_type *message_type)
+{
+ SCOPED_AO2LOCK(lock, router);
+
+ ao2_find(router->routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+}
+
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type)
+{
+ SCOPED_AO2LOCK(lock, router);
+
+ ao2_find(router->cache_routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
More information about the asterisk-commits
mailing list