[svn-commits] dlee: branch dlee/ASTERISK-21969 r396360 - in /team/dlee/ASTERISK-21969/res: ...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Aug 7 15:53:52 CDT 2013


Author: dlee
Date: Wed Aug  7 15:53:51 2013
New Revision: 396360

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396360
Log:
The pieces are scattered all over the floor.

Modified:
    team/dlee/ASTERISK-21969/res/res_stasis.c
    team/dlee/ASTERISK-21969/res/stasis/app.c
    team/dlee/ASTERISK-21969/res/stasis/app.h

Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Wed Aug  7 15:53:51 2013
@@ -82,6 +82,12 @@
 #define CONTROLS_NUM_BUCKETS 127
 
 /*!
+ * \brief Number of buckets for the Stasis bridges hash table.  Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
  * \brief Stasis application container.
  */
 struct ao2_container *apps_registry;
@@ -89,12 +95,6 @@
 struct ao2_container *app_controls;
 
 struct ao2_container *app_bridges;
-
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
 
 /*! AO2 hash function for \ref app */
 static int app_hash(const void *obj, const int flags)
@@ -202,36 +202,6 @@
 
 /*! \brief Typedef for blob handler callbacks */
 typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	char *uniqueid = arg;
-
-	return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
-	struct ao2_container *watching_apps;
-	char *uniqueid_dup;
-	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-	ast_assert(uniqueid != NULL);
-
-	uniqueid_dup = ast_strdupa(uniqueid);
-
-	watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
-	watching_apps = watching_apps_iter->c;
-
-	if (!ao2_container_count(watching_apps)) {
-		return NULL;
-	}
-
-	ao2_ref(watching_apps, +1);
-	return watching_apps_iter->c;
-}
 
 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
 typedef struct ast_json *(*channel_snapshot_monitor)(
@@ -354,75 +324,6 @@
 	channel_dialplan,
 	channel_callerid
 };
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	struct ast_json *msg = arg;
-
-	app_send(app, msg);
-	return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct stasis_cache_update *update = stasis_message_data(message);
-	struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-	struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	/* Pull timestamp from the new snapshot, or from the update message
-	 * when there isn't one. */
-	const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-	int i;
-
-	watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-	if (!watching_apps) {
-		return;
-	}
-
-	for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
-		RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-		msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
-		if (msg) {
-			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
-		}
-	}
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
-	ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct ast_channel_blob *obj = stasis_message_data(message);
-
-	if (!obj->snapshot) {
-		return;
-	}
-
-	msg = stasis_message_to_json(message);
-	if (!msg) {
-		return;
-	}
-
-	watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
-	if (!watching_apps) {
-		return;
-	}
-
-	distribute_message(watching_apps, msg);
-}
 
 /*!
  * \brief In addition to running ao2_cleanup(), this function also removes the
@@ -471,7 +372,7 @@
 	ast_bridge_destroy(bridge);
 }
 
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
 	int argc, char *argv[])
 {
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -512,7 +413,7 @@
 	return 0;
 }
 
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
 {
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
 	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -554,6 +455,7 @@
 
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
 	int res = 0;
 
 	ast_assert(chan != NULL);
@@ -577,15 +479,17 @@
 	}
 	ao2_link(app_controls, control);
 
-	res = app_send_start_msg(app, chan, argc, argv);
+	res = send_start_msg(app, chan, argc, argv);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
 			"Error sending start message to '%s'\n", app_name);
-		return res;
-	}
-
-	if (app_add_channel(app, chan)) {
-		ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+		return -1;
+	}
+
+	forwards = app_subscribe_channel(app, chan);
+	if (!forwards) {
+		ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+			app_name, ast_channel_name(chan));
 		return -1;
 	}
 
@@ -637,8 +541,7 @@
 		}
 	}
 
-	app_remove_channel(app, chan);
-	res = app_send_end_msg(app, chan);
+	res = send_end_msg(app, chan);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
 			"Error sending end message to %s\n", app_name);
@@ -749,296 +652,28 @@
 	ast_module_unref(ast_module_info->self);
 }
 
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	char *uniqueid = arg;
-
-	return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
-	struct ao2_container *watching_apps;
-	char *uniqueid_dup;
-	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-	ast_assert(uniqueid != NULL);
-
-	uniqueid_dup = ast_strdupa(uniqueid);
-
-	watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
-	watching_apps = watching_apps_iter->c;
-
-	if (!ao2_container_count(watching_apps)) {
-		return NULL;
-	}
-
-	ao2_ref(watching_apps, +1);
-	return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
-	app_remove_bridge(obj, arg);
-	return 0;
-}
-
-static struct ast_json *simple_bridge_event(
-	const char *type,
-	struct ast_bridge_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: o}",
-		"type", type,
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
-	const char *type,
-	struct ast_bridge_snapshot *bridge_snapshot,
-	struct ast_channel_snapshot *channel_snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: o, s: o}",
-		"type", type,
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
-		"channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct stasis_cache_update *update = stasis_message_data(message);
-	struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-	struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-	if (!watching_apps || !ao2_container_count(watching_apps)) {
-		return;
-	}
-
-	if (!new_snapshot) {
-		RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
-
-		/* The bridge has gone away. Create the message, make sure no apps are
-		 * watching this bridge anymore, and destroy the bridge's control
-		 * structure */
-		msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
-		ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
-		stasis_app_bridge_destroy(old_snapshot->uniqueid);
-	} else if (!old_snapshot) {
-		msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
-	}
-
-	if (!msg) {
-		return;
-	}
-
-	distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
-	/* remove any current entries for this app */
-	ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
-	/* relink as the only entry */
-	ao2_link(arg, obj);
-	return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
-{
-	ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
-	app_add_bridge(obj, arg);
-	return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
-{
-	RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
-	ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-	struct ast_bridge_merge_message *merge = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
-	const struct timeval *tv = stasis_message_timestamp(message);
-
-	watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
-	if (watching_apps_to) {
-		update_apps_list(watching_apps_all, watching_apps_to);
-	}
-
-	watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
-	if (watching_apps_from) {
-		update_bridge_interest(watching_apps_from, merge->to->uniqueid);
-		update_apps_list(watching_apps_all, watching_apps_from);
-	}
-
-	if (!ao2_container_count(watching_apps_all)) {
-		return;
-	}
-
-	msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
-		"type", "BridgeMerged",
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(merge->to),
-		"bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
-	if (!msg) {
-		return;
-	}
-
-	distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-	struct ast_bridge_blob *obj = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-	if (watching_apps_bridge) {
-		update_apps_list(watching_apps_all, watching_apps_bridge);
-	}
-
-	watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
-	if (watching_apps_channel) {
-		update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
-		update_apps_list(watching_apps_all, watching_apps_channel);
-	}
-
-	if (!ao2_container_count(watching_apps_all)) {
-		return;
-	}
-
-	msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
-		obj->channel, stasis_message_timestamp(message));
-
-	distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-	struct ast_bridge_blob *obj = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-	if (!watching_apps_bridge) {
-		return;
-	}
-
-	msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
-		obj->channel, stasis_message_timestamp(message));
-
-	distribute_message(watching_apps_bridge, msg);
-}
-
 static int load_module(void)
 {
-	int r = 0;
-
-	apps_registry =
-		ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+	apps_registry =	ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+		app_compare);
 	if (apps_registry == NULL) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-					     control_hash, control_compare);
+	app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+		control_compare);
 	if (app_controls == NULL) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-					     bridges_hash, bridges_compare);
-	if (app_bridges == NULL) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
-	if (!channel_router) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
-	/* TODO: This could be handled a lot better. Instead of subscribing to
-	 * the one caching topic and filtering out messages by channel id, we
-	 * should have individual caching topics per-channel, with a shared
-	 * back-end cache. That would simplify a lot of what's going on right
-	 * here.
-	 */
-	r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
-	if (r) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
-	if (!bridge_router) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
-	if (r) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
+        app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+		bridges_compare);
 
 	return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
-	int r = 0;
-
-	stasis_message_router_unsubscribe_and_join(channel_router);
-	channel_router = NULL;
-
-	stasis_message_router_unsubscribe_and_join(bridge_router);
-	bridge_router = NULL;
-
 	ao2_cleanup(apps_registry);
 	apps_registry = NULL;
 
@@ -1048,7 +683,7 @@
 	ao2_cleanup(app_bridges);
 	app_bridges = NULL;
 
-	return r;
+	return 0;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",

Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Wed Aug  7 15:53:51 2013
@@ -30,49 +30,65 @@
 #include "app.h"
 
 #include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
 
 struct app {
+	/*! Atomic integer counting channels in this app */
+	int channel_count;
+	/*! Aggregation topic for this application. */
+	struct stasis_topic *topic;
+	/*! Router for handling messages forwarded to \a topic. */
+	struct stasis_message_router *router;
 	/*! Callback function for this application. */
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
 	void *data;
-	/*! List of channel identifiers this app instance is interested in */
-	struct ao2_container *channels;
-	/*! List of bridge identifiers this app instance owns */
-	struct ao2_container *bridges;
 	/*! Name of the Stasis application */
 	char name[];
 };
 
+struct app_forwards {
+	struct app *app;
+
+	struct stasis_subscription *channel_forward;
+	struct stasis_subscription *channel_cached_forward;
+
+	struct stasis_subscription *bridge_forward;
+	struct stasis_subscription *bridge_cached_forward;
+};
+
 static void app_dtor(void *obj)
 {
 	struct app *app = obj;
+
+	ao2_cleanup(app->topic);
+	app->topic = NULL;
 
 	ao2_cleanup(app->data);
 	app->data = NULL;
-	ao2_cleanup(app->channels);
-	app->channels = NULL;
-	ao2_cleanup(app->bridges);
-	app->bridges = NULL;
+}
+
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+	struct stasis_topic *topic, struct stasis_message *message)
+{
+	struct app *app = data;
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	json = stasis_message_to_json(message);
+	if (!json) {
+		return;
+	}
+
+	app_send(app, json);
 }
 
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	size_t size;
+	int res = 0;
 
 	ast_assert(name != NULL);
 	ast_assert(handler != NULL);
@@ -85,76 +101,53 @@
 	if (!app) {
 		return NULL;
 	}
+
+	app->topic = stasis_topic_create(name);
+	if (!app->topic) {
+		return NULL;
+	}
+
+	app->router = stasis_message_router_create(app->topic);
+	if (!app->router) {
+		return NULL;
+	}
+
+	/*
+	res |= stasis_message_router_add(app->router,
+		ast_channel_user_event_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+		ast_channel_varset_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+		ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+		ast_channel_hangup_request_type(), sub_channel_blob_handler,
+		app);
+        res |= stasis_message_router_add(app->router,
+		stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
+        res |= stasis_message_router_add(app->router,
+		ast_bridge_merge_message_type(), sub_bridge_merge_handler,
+		app);
+        res |= stasis_message_router_add(app->router,
+		ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
+		app);
+        res |= stasis_message_router_add(app->router,
+		ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
+	*/
+	res |= stasis_message_router_set_default(app->router,
+		sub_default_handler, app);
+
+	if (res != 0) {
+		return NULL;
+	}
+
 
 	strncpy(app->name, name, size - sizeof(*app));
 	app->handler = handler;
 	ao2_ref(data, +1);
 	app->data = data;
 
-	app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
-	if (!app->channels) {
-		return NULL;
-	}
-
-	app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
-	if (!app->bridges) {
-		return NULL;
-	}
-
 	ao2_ref(app, +1);
 	return app;
-}
-
-int app_add_channel(struct app *app, const struct ast_channel *chan)
-{
-	SCOPED_AO2LOCK(lock, app);
-	const char *uniqueid;
-
-	ast_assert(app != NULL);
-	ast_assert(chan != NULL);
-
-	/* Don't accept new channels in an inactive application */
-	if (!app->handler) {
-		return -1;
-	}
-
-	uniqueid = ast_channel_uniqueid(chan);
-	return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
-}
-
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
-{
-	SCOPED_AO2LOCK(lock, app);
-
-	ast_assert(app != NULL);
-	ast_assert(chan != NULL);
-
-	ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
-}
-
-int app_add_bridge(struct app *app, const char *uniqueid)
-{
-	SCOPED_AO2LOCK(lock, app);
-
-	ast_assert(app != NULL);
-	ast_assert(uniqueid != NULL);
-
-	/* Don't accept new bridges in an inactive application */
-	if (!app->handler) {
-		return -1;
-	}
-
-	return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
-}
-
-void app_remove_bridge(struct app* app, const char *uniqueid)
-{
-	SCOPED_AO2LOCK(lock, app);
-
-	ast_assert(app != NULL);
-	ast_assert(uniqueid != NULL);
-
-	ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE);
 }
 
 /*!
@@ -207,7 +200,7 @@
 	SCOPED_AO2LOCK(lock, app);
 
 	return app->handler == NULL &&
-		ao2_container_count(app->channels) == 0;
+		ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
 }
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -243,16 +236,96 @@
 	return app->name;
 }
 
-int app_is_watching_channel(struct app *app, const char *uniqueid)
-{
-	RAII_VAR(char *, found, NULL, ao2_cleanup);
-	found = ao2_find(app->channels, uniqueid, OBJ_KEY);
-	return found != NULL;
-}
-
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
-{
-	RAII_VAR(char *, found, NULL, ao2_cleanup);
-	found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
-	return found != NULL;
-}
+static void forwards_dtor(void *obj)
+{
+	struct app_forwards *forwards = obj;
+
+	stasis_unsubscribe(forwards->channel_forward);
+	forwards->channel_forward = NULL;
+	stasis_unsubscribe(forwards->channel_cached_forward);
+	forwards->channel_cached_forward = NULL;
+	stasis_unsubscribe(forwards->bridge_forward);
+	forwards->bridge_forward = NULL;
+	stasis_unsubscribe(forwards->bridge_cached_forward);
+	forwards->bridge_cached_forward = NULL;
+
+	ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
+
+	ao2_cleanup(forwards->app);
+	forwards->app = NULL;
+}
+
+struct app_forwards *app_subscribe_channel(struct app *app,
+	struct ast_channel *chan)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !chan) {
+		return NULL;
+	}
+
+	forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
+	if (!forwards) {
+		return NULL;
+	}
+
+	ao2_ref(app, +1);
+	forwards->app = app;
+
+	forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
+		app->topic);
+	if (!forwards->channel_forward) {
+		return NULL;
+	}
+
+	forwards->channel_cached_forward = stasis_forward_all(
+		ast_channel_topic_cached(chan), app->topic);
+	if (!forwards->channel_cached_forward) {
+		return NULL;
+	}
+
+	ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+int app_subscribe_bridge(struct app_forwards *forwards,
+	struct ast_bridge *bridge)
+{
+	if (!forwards || !bridge) {
+		return -1;
+	}
+
+	ast_assert(forwards->app != NULL);
+
+	app_unsubscribe_bridge(forwards);
+
+	forwards->bridge_forward = stasis_forward_all(
+		ast_bridge_topic(bridge), forwards->app->topic);
+	if (!forwards->bridge_forward) {
+		return -1;
+	}
+
+	forwards->bridge_cached_forward = stasis_forward_all(
+		ast_bridge_topic_cached(bridge), forwards->app->topic);
+	if (!forwards->bridge_cached_forward) {
+		/* Probably a bad idea to stay half-subscribed */
+		stasis_unsubscribe(forwards->bridge_forward);
+		forwards->bridge_forward = NULL;
+		return -1;
+	}
+
+	return 0;
+}
+
+void app_unsubscribe_bridge(struct app_forwards *forwards)
+{
+	if (!forwards) {
+		return;
+	}
+
+	stasis_unsubscribe(forwards->bridge_forward);
+	forwards->bridge_forward = NULL;
+	stasis_unsubscribe(forwards->bridge_cached_forward);
+	forwards->bridge_cached_forward = NULL;
+}

Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Wed Aug  7 15:53:51 2013
@@ -96,17 +96,6 @@
 const char *app_name(const struct app *app);
 
 /*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
-	struct stasis_topic *topic);
-
-/*!
  * \brief Send a message to an application.
  *
  * \param app Application.
@@ -114,83 +103,41 @@
  */
 void app_send(struct app *app, struct ast_json *message);
 
+struct app_forwards;
+
 /*!
- * \brief Send the start message to an application.
+ * \brief Subscribes an application to a channel.
+ *
+ * The returned object is AO2 managed, and should be ao2_cleanup()'ed to kill
+ * the subscriptions.
  *
  * \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
+ * \param chan Channel to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
-	char *argv[]);
+struct app_forwards *app_subscribe_channel(struct app *app,
+	struct ast_channel *chan);
 
 /*!
- * \brief Send the end message to an application.
+ * \brief Add a bridge subscription to an existing channel subscription.
  *
- * \param app Application.
- * \param chan The channel leaving the application.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_bridge(struct app_forwards *forwards,
+	struct ast_bridge *bridge);
 
 /*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the bridge subscription for an application.
  *
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_channel(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Add a channel to an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_add_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Remove a channel from an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+void app_unsubscribe_bridge(struct app_forwards *forwards);
 
 #endif /* _ASTERISK_RES_STASIS_APP_H */




More information about the svn-commits mailing list