[svn-commits] dlee: branch dlee/cache-router r394388 - in /team/dlee/cache-router: include/...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Jul 15 10:53:57 CDT 2013


Author: dlee
Date: Mon Jul 15 10:53:55 2013
New Revision: 394388

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=394388
Log:
Cache routing

Modified:
    team/dlee/cache-router/include/asterisk/stasis_message_router.h
    team/dlee/cache-router/main/stasis_message_router.c

Modified: team/dlee/cache-router/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/include/asterisk/stasis_message_router.h?view=diff&rev=394388&r1=394387&r2=394388
==============================================================================
--- team/dlee/cache-router/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/cache-router/include/asterisk/stasis_message_router.h Mon Jul 15 10:53:55 2013
@@ -95,6 +95,11 @@
 /*!
  * \brief Add a route to a message router.
  *
+ * A particular \a message_type may have at most one route per \a router. If
+ * you route \ref stasis_cache_update messages, the callback will only receive
+ * updates for types not handled by routes added with
+ * stasis_message_router_add_cache_update().
+ *
  * \param router Router to add the route to.
  * \param message_type Type of message to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -106,9 +111,29 @@
  * \since 12
  */
 int stasis_message_router_add(struct stasis_message_router *router,
-			      struct stasis_message_type *message_type,
-			      stasis_subscription_cb callback,
-			      void *data);
+	struct stasis_message_type *message_type,
+	stasis_subscription_cb callback, void *data);
+
+/*!
+ * \brief Add a route for \ref stasis_cache_update messages to a message router.
+ *
+ * A particular \a message_type may have at most one cache route per \a router.
+ * These are distinct from regular routes, so one could have both a regular
+ * route and a cache route for the same \a message_type.
+ *
+ * \param router Router to add the route to.
+ * \param message_type Subtype of cache update to route.
+ * \param callback Callback to forard messages of \a message_type to.
+ * \param data Data pointer to pass to \a callback.
+ *
+ * \retval 0 on success
+ * \retval -1 on failure
+ *
+ * \since 12
+ */
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+	struct stasis_message_type *message_type,
+	stasis_subscription_cb callback, void *data);
 
 /*!
  * \brief Remove a route from a message router.
@@ -119,7 +144,19 @@
  * \since 12
  */
 void stasis_message_router_remove(struct stasis_message_router *router,
-			      struct stasis_message_type *message_type);
+	struct stasis_message_type *message_type);
+
+/*!
+ * \brief Remove a cache route from a message router.
+ *
+ * \param router Router to remove the route from.
+ * \param message_type Type of message to route.
+ *
+ * \since 12
+ */
+void stasis_message_router_remove_cache_update(
+	struct stasis_message_router *router,
+	struct stasis_message_type *message_type);
 
 /*!
  * \brief Sets the default route of a router.

Modified: team/dlee/cache-router/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/cache-router/main/stasis_message_router.c?view=diff&rev=394388&r1=394387&r2=394388
==============================================================================
--- team/dlee/cache-router/main/stasis_message_router.c (original)
+++ team/dlee/cache-router/main/stasis_message_router.c Mon Jul 15 10:53:55 2013
@@ -33,6 +33,9 @@
 
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_message_router.h"
+
+/*! Number of hash buckets for the route table. Keep it prime! */
+#define ROUTE_TABLE_BUCKETS 7
 
 /*! \internal */
 struct stasis_message_route {
@@ -75,6 +78,8 @@
 	struct stasis_subscription *subscription;
 	/*! Subscribed routes */
 	struct ao2_container *routes;
+	/*! Subscribed routes for \ref stasi_cache_update messages */
+	struct ao2_container *cache_routes;
 	/*! Route of last resort */
 	struct stasis_message_route *default_route;
 };
@@ -90,13 +95,46 @@
 	ao2_cleanup(router->routes);
 	router->routes = NULL;
 
+	ao2_cleanup(router->cache_routes);
+	router->cache_routes = NULL;
+
 	ao2_cleanup(router->default_route);
 	router->default_route = NULL;
 }
 
-static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type)
-{
-	return ao2_find(router->routes, message_type, OBJ_KEY);
+static struct stasis_message_route *find_route(
+	struct stasis_message_router *router,
+	struct stasis_message *message)
+{
+	RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+	struct stasis_message_type *type = stasis_message_type(message);
+	SCOPED_AO2LOCK(lock, router);
+
+	if (type == stasis_cache_update_type()) {
+		/* Find a cache route */
+		struct stasis_cache_update *update =
+			stasis_message_data(message);
+		route = ao2_find(router->routes, update->type, OBJ_KEY);
+	}
+
+	if (route == NULL) {
+		/* Find a regular route */
+		route = ao2_find(router->routes, type, OBJ_KEY);
+	}
+
+	if (route == NULL) {
+		/* Maybe the default route, then? */
+		if ((route = router->default_route)) {
+			ao2_ref(route, +1);
+		}
+	}
+
+	if (route == NULL) {
+		return NULL;
+	}
+
+	ao2_ref(route, +1);
+	return route;
 }
 
 static void router_dispatch(void *data,
@@ -105,29 +143,18 @@
 			    struct stasis_message *message)
 {
 	struct stasis_message_router *router = data;
-	RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-	struct stasis_message_type *type = stasis_message_type(message);
-
-	{
-		SCOPED_AO2LOCK(lock, router);
-
-		if (!(route = find_route(router, type))) {
-			if ((route = router->default_route)) {
-				ao2_ref(route, +1);
-			}
-		}
-	}
+	RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+	route = find_route(router, message);
 
 	if (route) {
 		route->callback(route->data, sub, topic, message);
 	}
 
+
 	if (stasis_subscription_final_message(sub, message)) {
-		router_needs_cleanup = router;
-		return;
-	}
-
+		ao2_cleanup(router);
+	}
 }
 
 struct stasis_message_router *stasis_message_router_create(
@@ -140,7 +167,15 @@
 		return NULL;
 	}
 
-	if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) {
+	router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
+		route_cmp);
+	if (!router->routes) {
+		return NULL;
+	}
+
+	router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
+		route_hash, route_cmp);
+	if (!router->cache_routes) {
 		return NULL;
 	}
 
@@ -211,7 +246,10 @@
 	RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
 	SCOPED_AO2LOCK(lock, router);
 
-	if ((existing_route = find_route(router, route->message_type))) {
+	existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+
+	if (existing_route) {
+		ast_log(LOG_ERROR, "Cannot add route; route exists\n");
 		return -1;
 	}
 
@@ -219,10 +257,27 @@
 	return 0;
 }
 
+static int add_cache_route(struct stasis_message_router *router,
+		     struct stasis_message_route *route)
+{
+	RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
+	SCOPED_AO2LOCK(lock, router);
+
+	existing_route = ao2_find(router->cache_routes, route->message_type,
+		OBJ_KEY);
+
+	if (existing_route) {
+		ast_log(LOG_ERROR, "Cannot add route; route exists\n");
+		return -1;
+	}
+
+	ao2_link(router->cache_routes, route);
+	return 0;
+}
+
 int stasis_message_router_add(struct stasis_message_router *router,
-			      struct stasis_message_type *message_type,
-			      stasis_subscription_cb callback,
-			      void *data)
+	struct stasis_message_type *message_type,
+	stasis_subscription_cb callback, void *data)
 {
 	RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
 
@@ -234,12 +289,37 @@
 	return add_route(router, route);
 }
 
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+	struct stasis_message_type *message_type,
+	stasis_subscription_cb callback, void *data)
+{
+	RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+	route = route_create(message_type, callback, data);
+	if (!route) {
+		return -1;
+	}
+
+	return add_cache_route(router, route);
+}
+
 void stasis_message_router_remove(struct stasis_message_router *router,
-			      struct stasis_message_type *message_type)
-{
-	SCOPED_AO2LOCK(lock, router);
-
-	ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+	struct stasis_message_type *message_type)
+{
+	SCOPED_AO2LOCK(lock, router);
+
+	ao2_find(router->routes, message_type,
+		OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+}
+
+void stasis_message_router_remove_cache_update(
+	struct stasis_message_router *router,
+	struct stasis_message_type *message_type)
+{
+	SCOPED_AO2LOCK(lock, router);
+
+	ao2_find(router->cache_routes, message_type,
+		OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
 }
 
 int stasis_message_router_set_default(struct stasis_message_router *router,




More information about the svn-commits mailing list