[asterisk-commits] dlee: trunk r397820 - in /trunk: ./ main/ res/ res/stasis/ rest-api-templates/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Aug 27 14:19:42 CDT 2013


Author: dlee
Date: Tue Aug 27 14:19:36 2013
New Revision: 397820

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=397820
Log:
ARI: WebSocket event cleanup

Stasis events (which get distributed over the ARI WebSocket) are created
by subscribing to the channel_all_cached and bridge_all_cached topics,
filtering out events for channels/bridges currently subscribed to.

There are two issues with that. First was a race condition, where
messages in-flight to the master subscribe-to-all-things topic would get
sent out, even though the events happened before the channel was put
into Stasis. Secondly, as the number of channels and bridges grow in the
system, the work spent filtering messages becomes excessive.

Since r395954, individual channels and bridges have caching topics, and
can be subscribed to individually. This patch takes advantage, so that
channels and bridges are subscribed to on demand, instead of filtering
the global topics.

The one case where filtering is still required is handling BridgeMerge
messages, which are published directly to the bridge_all topic.

Other than the change to how subscriptions work, this patch mostly just
moves code around. Most of the work generating JSON objects from
messages was moved to .to_json handlers on the message types. The
callback functions handling app subscriptions were moved from res_stasis
(b/c they were global to the model) to stasis/app.c (b/c they are local
to the app now).

(closes issue ASTERISK-21969)
Reported by: Matt Jordan
Review: https://reviewboard.asterisk.org/r/2754/
........

Merged revisions 397816 from http://svn.asterisk.org/svn/asterisk/branches/12

Modified:
    trunk/   (props changed)
    trunk/main/stasis_bridges.c
    trunk/res/res_ari_asterisk.c
    trunk/res/res_ari_bridges.c
    trunk/res/res_ari_events.c
    trunk/res/res_stasis.c
    trunk/res/stasis/app.c
    trunk/res/stasis/app.h
    trunk/rest-api-templates/param_parsing.mustache
    trunk/rest-api-templates/res_ari_resource.c.mustache

Propchange: trunk/
------------------------------------------------------------------------------
--- branch-12-merged (original)
+++ branch-12-merged Tue Aug 27 14:19:36 2013
@@ -1,1 +1,1 @@
-/branches/12:1-397673,397690,397713,397745,397759,397809
+/branches/12:1-397816

Modified: trunk/main/stasis_bridges.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_bridges.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/main/stasis_bridges.c (original)
+++ trunk/main/stasis_bridges.c Tue Aug 27 14:19:36 2013
@@ -132,6 +132,9 @@
 
 static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
 static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg);
 
 static struct stasis_cp_all *bridge_cache_all;
 
@@ -139,9 +142,12 @@
  * @{ \brief Define bridge message types.
  */
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type,
+	.to_json = ast_bridge_merge_message_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+	.to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+	.to_json = ast_channel_left_bridge_to_json);
 STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
 STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
 /*! @} */
@@ -305,6 +311,19 @@
 
 	ao2_ref(msg, +1);
 	return msg;
+}
+
+static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg)
+{
+	struct ast_bridge_merge_message *merge;
+
+	merge = stasis_message_data(msg);
+
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", "BridgeMerged",
+                "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL),
+                "bridge", ast_bridge_snapshot_to_json(merge->to),
+                "bridge_from", ast_bridge_snapshot_to_json(merge->from));
 }
 
 void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
@@ -415,6 +434,35 @@
 	/* state first, then leave blob (opposite of enter, preserves nesting of events) */
 	bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
 	stasis_publish(ast_bridge_topic(bridge), msg);
+}
+
+static struct ast_json *simple_bridge_channel_event(
+        const char *type,
+        struct ast_bridge_snapshot *bridge_snapshot,
+        struct ast_channel_snapshot *channel_snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+                "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+	return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+	return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
 }
 
 typedef struct ast_json *(*json_item_serializer_cb)(void *obj);

Modified: trunk/res/res_ari_asterisk.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_ari_asterisk.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/res/res_ari_asterisk.c (original)
+++ trunk/res/res_ari_asterisk.c Tue Aug 27 14:19:36 2013
@@ -81,8 +81,16 @@
 				goto fin;
 			}
 
-			args.only_count = ast_app_separate_args(
-				args.only_parse, ',', vals, ARRAY_LEN(vals));
+			if (strlen(args.only_parse) == 0) {
+				/* ast_app_separate_args can't handle "" */
+				args.only_count = 1;
+				vals[0] = args.only_parse;
+			} else {
+				args.only_count = ast_app_separate_args(
+					args.only_parse, ',', vals,
+					ARRAY_LEN(vals));
+			}
+
 			if (args.only_count == 0) {
 				ast_ari_response_alloc_failed(response);
 				goto fin;

Modified: trunk/res/res_ari_bridges.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_ari_bridges.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/res/res_ari_bridges.c (original)
+++ trunk/res/res_ari_bridges.c Tue Aug 27 14:19:36 2013
@@ -300,8 +300,16 @@
 				goto fin;
 			}
 
-			args.channel_count = ast_app_separate_args(
-				args.channel_parse, ',', vals, ARRAY_LEN(vals));
+			if (strlen(args.channel_parse) == 0) {
+				/* ast_app_separate_args can't handle "" */
+				args.channel_count = 1;
+				vals[0] = args.channel_parse;
+			} else {
+				args.channel_count = ast_app_separate_args(
+					args.channel_parse, ',', vals,
+					ARRAY_LEN(vals));
+			}
+
 			if (args.channel_count == 0) {
 				ast_ari_response_alloc_failed(response);
 				goto fin;
@@ -403,8 +411,16 @@
 				goto fin;
 			}
 
-			args.channel_count = ast_app_separate_args(
-				args.channel_parse, ',', vals, ARRAY_LEN(vals));
+			if (strlen(args.channel_parse) == 0) {
+				/* ast_app_separate_args can't handle "" */
+				args.channel_count = 1;
+				vals[0] = args.channel_parse;
+			} else {
+				args.channel_count = ast_app_separate_args(
+					args.channel_parse, ',', vals,
+					ARRAY_LEN(vals));
+			}
+
 			if (args.channel_count == 0) {
 				ast_ari_response_alloc_failed(response);
 				goto fin;

Modified: trunk/res/res_ari_events.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_ari_events.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/res/res_ari_events.c (original)
+++ trunk/res/res_ari_events.c Tue Aug 27 14:19:36 2013
@@ -89,8 +89,16 @@
 				goto fin;
 			}
 
-			args.app_count = ast_app_separate_args(
-				args.app_parse, ',', vals, ARRAY_LEN(vals));
+			if (strlen(args.app_parse) == 0) {
+				/* ast_app_separate_args can't handle "" */
+				args.app_count = 1;
+				vals[0] = args.app_parse;
+			} else {
+				args.app_count = ast_app_separate_args(
+					args.app_parse, ',', vals,
+					ARRAY_LEN(vals));
+			}
+
 			if (args.app_count == 0) {
 				ast_ari_response_alloc_failed(response);
 				goto fin;
@@ -126,14 +134,16 @@
 		 * negotiation. Param parsing should happen earlier, but we
 		 * need a way to pass it through the WebSocket code to the
 		 * callback */
-		RAII_VAR(char *, msg, NULL, ast_free);
+		RAII_VAR(char *, msg, NULL, ast_json_free);
 		if (response->message) {
 			msg = ast_json_dump_string(response->message);
 		} else {
-			msg = ast_strdup("?");
+			ast_log(LOG_ERROR, "Missing response message\n");
 		}
-		ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg,
-			strlen(msg));
+		if (msg) {
+			ast_websocket_write(ws_session,
+				AST_WEBSOCKET_OPCODE_TEXT, msg,	strlen(msg));
+		}
 	}
 	ast_free(args.app_parse);
 	ast_free(args.app);

Modified: trunk/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_stasis.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/res/res_stasis.c (original)
+++ trunk/res/res_stasis.c Tue Aug 27 14:19:36 2013
@@ -87,6 +87,12 @@
 #define CONTROLS_NUM_BUCKETS 127
 
 /*!
+ * \brief Number of buckets for the Stasis bridges hash table.  Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
  * \brief Stasis application container.
  */
 struct ao2_container *apps_registry;
@@ -96,12 +102,6 @@
 struct ao2_container *app_bridges;
 
 struct ao2_container *app_bridges_moh;
-
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
 
 /*! AO2 hash function for \ref app */
 static int app_hash(const void *obj, const int flags)
@@ -151,6 +151,30 @@
 	} else {
 		return 0;
 	}
+}
+
+static int cleanup_cb(void *obj, void *arg, int flags)
+{
+	struct app *app = obj;
+
+	if (!app_is_finished(app)) {
+		return 0;
+	}
+
+	ast_verb(1, "Shutting down application '%s'\n", app_name(app));
+	app_shutdown(app);
+
+	return CMP_MATCH;
+
+}
+
+/*!
+ * \brief Clean up any old apps that we don't need any more.
+ */
+static void cleanup(void)
+{
+	ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
+		cleanup_cb, NULL);
 }
 
 struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
@@ -435,229 +459,6 @@
 	return ao2_find(app_bridges, bridge_id, OBJ_KEY);
 }
 
-/*! \brief Typedef for blob handler callbacks */
-typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	char *uniqueid = arg;
-
-	return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
-	struct ao2_container *watching_apps;
-	char *uniqueid_dup;
-	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-	ast_assert(uniqueid != NULL);
-
-	uniqueid_dup = ast_strdupa(uniqueid);
-
-	watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
-	watching_apps = watching_apps_iter->c;
-
-	if (!ao2_container_count(watching_apps)) {
-		return NULL;
-	}
-
-	ao2_ref(watching_apps, +1);
-	return watching_apps_iter->c;
-}
-
-/*! \brief Typedef for callbacks that get called on channel snapshot updates */
-typedef struct ast_json *(*channel_snapshot_monitor)(
-	struct ast_channel_snapshot *old_snapshot,
-	struct ast_channel_snapshot *new_snapshot,
-	const struct timeval *tv);
-
-static struct ast_json *simple_channel_event(
-	const char *type,
-	struct ast_channel_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: o}",
-		"type", type,
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_created_event(
-	struct ast_channel_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return simple_channel_event("ChannelCreated", snapshot, tv);
-}
-
-static struct ast_json *channel_destroyed_event(
-	struct ast_channel_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
-		"type", "ChannelDestroyed",
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"cause", snapshot->hangupcause,
-		"cause_txt", ast_cause2str(snapshot->hangupcause),
-		"channel", ast_channel_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *channel_state_change_event(
-	struct ast_channel_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return simple_channel_event("ChannelStateChange", snapshot, tv);
-}
-
-/*! \brief Handle channel state changes */
-static struct ast_json *channel_state(
-	struct ast_channel_snapshot *old_snapshot,
-	struct ast_channel_snapshot *new_snapshot,
-	const struct timeval *tv)
-{
-	struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
-
-	if (!old_snapshot) {
-		return channel_created_event(snapshot, tv);
-	} else if (!new_snapshot) {
-		return channel_destroyed_event(snapshot, tv);
-	} else if (old_snapshot->state != new_snapshot->state) {
-		return channel_state_change_event(snapshot, tv);
-	}
-
-	return NULL;
-}
-
-static struct ast_json *channel_dialplan(
-	struct ast_channel_snapshot *old_snapshot,
-	struct ast_channel_snapshot *new_snapshot,
-	const struct timeval *tv)
-{
-	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
-	/* No Newexten event on cache clear */
-	if (!new_snapshot) {
-		return NULL;
-	}
-
-	/* Empty application is not valid for a Newexten event */
-	if (ast_strlen_zero(new_snapshot->appl)) {
-		return NULL;
-	}
-
-	if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
-		return NULL;
-	}
-
-	return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
-		"type", "ChannelDialplan",
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"dialplan_app", new_snapshot->appl,
-		"dialplan_app_data", new_snapshot->data,
-		"channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-static struct ast_json *channel_callerid(
-	struct ast_channel_snapshot *old_snapshot,
-	struct ast_channel_snapshot *new_snapshot,
-	const struct timeval *tv)
-{
-	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
-
-	/* No NewCallerid event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
-		return NULL;
-	}
-
-	if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
-		return NULL;
-	}
-
-	return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
-		"type", "ChannelCallerId",
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"caller_presentation", new_snapshot->caller_pres,
-		"caller_presentation_txt", ast_describe_caller_presentation(
-			new_snapshot->caller_pres),
-		"channel", ast_channel_snapshot_to_json(new_snapshot));
-}
-
-channel_snapshot_monitor channel_monitors[] = {
-	channel_state,
-	channel_dialplan,
-	channel_callerid
-};
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	struct ast_json *msg = arg;
-
-	app_send(app, msg);
-	return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct stasis_cache_update *update = stasis_message_data(message);
-	struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-	struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	/* Pull timestamp from the new snapshot, or from the update message
-	 * when there isn't one. */
-	const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-	int i;
-
-	watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-	if (!watching_apps) {
-		return;
-	}
-
-	for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
-		RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-		msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
-		if (msg) {
-			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
-		}
-	}
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
-	ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct ast_channel_blob *obj = stasis_message_data(message);
-
-	if (!obj->snapshot) {
-		return;
-	}
-
-	msg = stasis_message_to_json(message);
-	if (!msg) {
-		return;
-	}
-
-	watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
-	if (!watching_apps) {
-		return;
-	}
-
-	distribute_message(watching_apps, msg);
-}
 
 /*!
  * \brief In addition to running ao2_cleanup(), this function also removes the
@@ -709,7 +510,7 @@
 	ast_bridge_destroy(bridge, 0);
 }
 
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
 	int argc, char *argv[])
 {
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -726,8 +527,9 @@
 		return -1;
 	}
 
-	msg = ast_json_pack("{s: s, s: [], s: o}",
+	msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
 		"type", "StasisStart",
+		"timestamp", ast_json_timeval(ast_tvnow(), NULL),
 		"args",
 		"channel", ast_channel_snapshot_to_json(snapshot));
 	if (!msg) {
@@ -750,7 +552,7 @@
 	return 0;
 }
 
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
 {
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
 	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -763,8 +565,9 @@
 		return -1;
 	}
 
-	msg = ast_json_pack("{s: s, s: o}",
+	msg = ast_json_pack("{s: s, s: o, s: o}",
 		"type", "StasisEnd",
+		"timestamp", ast_json_timeval(ast_tvnow(), NULL),
 		"channel", ast_channel_snapshot_to_json(snapshot));
 	if (!msg) {
 		return -1;
@@ -815,15 +618,17 @@
 	}
 	ao2_link(app_controls, control);
 
-	res = app_send_start_msg(app, chan, argc, argv);
+	res = send_start_msg(app, chan, argc, argv);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
 			"Error sending start message to '%s'\n", app_name);
-		return res;
-	}
-
-	if (app_add_channel(app, chan)) {
-		ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+		return -1;
+	}
+
+	res = app_subscribe_channel(app, chan);
+	if (res != 0) {
+		ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+			app_name, ast_channel_name(chan));
 		return -1;
 	}
 
@@ -831,13 +636,23 @@
 		RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
 		int r;
 		int command_count;
+		struct ast_bridge *last_bridge = NULL;
+		struct ast_bridge *bridge = NULL;
 
 		/* Check to see if a bridge absorbed our hangup frame */
 		if (ast_check_hangup_locked(chan)) {
 			break;
 		}
 
-		if (stasis_app_get_bridge(control)) {
+		last_bridge = bridge;
+		bridge = stasis_app_get_bridge(control);
+
+		if (bridge != last_bridge) {
+			app_unsubscribe_bridge(app, last_bridge);
+			app_subscribe_bridge(app, bridge);
+		}
+
+		if (bridge) {
 			/* Bridge is handling channel frames */
 			control_wait(control);
 			control_dispatch_all(control, chan);
@@ -882,13 +697,20 @@
 		}
 	}
 
-	app_remove_channel(app, chan);
-	res = app_send_end_msg(app, chan);
+	app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
+	app_unsubscribe_channel(app, chan);
+
+	res = send_end_msg(app, chan);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
 			"Error sending end message to %s\n", app_name);
 		return res;
 	}
+
+	/* There's an off chance that app is ready for cleanup. Go ahead
+	 * and clean up, just in case
+	 */
+	cleanup();
 
 	return res;
 }
@@ -910,29 +732,6 @@
 
 	app_send(app, message);
 	return 0;
-}
-
-static int cleanup_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-
-	if (!app_is_finished(app)) {
-		return 0;
-	}
-
-	ast_verb(1, "Cleaning up application '%s'\n", app_name(app));
-
-	return CMP_MATCH;
-
-}
-
-/*!
- * \brief Clean up any old apps that we don't need any more.
- */
-static void cleanup(void)
-{
-	ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
-		cleanup_cb, NULL);
 }
 
 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
@@ -994,249 +793,22 @@
 	ast_module_unref(ast_module_info->self);
 }
 
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
-	struct app *app = obj;
-	char *uniqueid = arg;
-
-	return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
-	struct ao2_container *watching_apps;
-	char *uniqueid_dup;
-	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
-	ast_assert(uniqueid != NULL);
-
-	uniqueid_dup = ast_strdupa(uniqueid);
-
-	watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
-	watching_apps = watching_apps_iter->c;
-
-	if (!ao2_container_count(watching_apps)) {
-		return NULL;
-	}
-
-	ao2_ref(watching_apps, +1);
-	return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
-	app_remove_bridge(obj, arg);
-	return 0;
-}
-
-static struct ast_json *simple_bridge_event(
-	const char *type,
-	struct ast_bridge_snapshot *snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: o}",
-		"type", type,
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
-	const char *type,
-	struct ast_bridge_snapshot *bridge_snapshot,
-	struct ast_channel_snapshot *channel_snapshot,
-	const struct timeval *tv)
-{
-	return ast_json_pack("{s: s, s: o, s: o, s: o}",
-		"type", type,
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
-		"channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-	struct stasis_cache_update *update = stasis_message_data(message);
-	struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
-	struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
-	if (!watching_apps || !ao2_container_count(watching_apps)) {
-		return;
-	}
-
-	if (!new_snapshot) {
-		RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
-
-		/* The bridge has gone away. Create the message, make sure no apps are
-		 * watching this bridge anymore, and destroy the bridge's control
-		 * structure */
-		msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
-		ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
-		stasis_app_bridge_destroy(old_snapshot->uniqueid);
-	} else if (!old_snapshot) {
-		msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
-	}
-
-	if (!msg) {
-		return;
-	}
-
-	distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
-	/* remove any current entries for this app */
-	ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
-	/* relink as the only entry */
-	ao2_link(arg, obj);
-	return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
-{
-	ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
-	app_add_bridge(obj, arg);
-	return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
-{
-	RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
-	ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-	struct ast_bridge_merge_message *merge = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
-	const struct timeval *tv = stasis_message_timestamp(message);
-
-	watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
-	if (watching_apps_to) {
-		update_apps_list(watching_apps_all, watching_apps_to);
-	}
-
-	watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
-	if (watching_apps_from) {
-		update_bridge_interest(watching_apps_from, merge->to->uniqueid);
-		update_apps_list(watching_apps_all, watching_apps_from);
-	}
-
-	if (!ao2_container_count(watching_apps_all)) {
-		return;
-	}
-
-	msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
-		"type", "BridgeMerged",
-		"timestamp", ast_json_timeval(*tv, NULL),
-		"bridge", ast_bridge_snapshot_to_json(merge->to),
-		"bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
-	if (!msg) {
-		return;
-	}
-
-	distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-	RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-	struct ast_bridge_blob *obj = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-	if (watching_apps_bridge) {
-		update_apps_list(watching_apps_all, watching_apps_bridge);
-	}
-
-	watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
-	if (watching_apps_channel) {
-		update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
-		update_apps_list(watching_apps_all, watching_apps_channel);
-	}
-
-	if (!ao2_container_count(watching_apps_all)) {
-		return;
-	}
-
-	msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
-		obj->channel, stasis_message_timestamp(message));
-
-	distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
-		struct stasis_subscription *sub,
-		struct stasis_topic *topic,
-		struct stasis_message *message)
-{
-	RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
-	struct ast_bridge_blob *obj = stasis_message_data(message);
-	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-	watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-	if (!watching_apps_bridge) {
-		return;
-	}
-
-	msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
-		obj->channel, stasis_message_timestamp(message));
-
-	distribute_message(watching_apps_bridge, msg);
-}
-
 static int load_module(void)
 {
-	int r = 0;
-
-	apps_registry =
-		ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+	apps_registry =	ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+		app_compare);
 	if (apps_registry == NULL) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-					     control_hash, control_compare);
+	app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+		control_compare);
 	if (app_controls == NULL) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-					     bridges_hash, bridges_compare);
-	if (app_bridges == NULL) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
+        app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+		bridges_compare);
 
 	app_bridges_moh = ao2_container_alloc_hash(
 		AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
@@ -1246,52 +818,11 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
-	if (!channel_router) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
-	/* TODO: This could be handled a lot better. Instead of subscribing to
-	 * the one caching topic and filtering out messages by channel id, we
-	 * should have individual caching topics per-channel, with a shared
-	 * back-end cache. That would simplify a lot of what's going on right
-	 * here.
-	 */
-	r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
-	r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
-	if (r) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
-	if (!bridge_router) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
-	r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
-	r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
-	if (r) {
-		return AST_MODULE_LOAD_FAILURE;
-	}
-
 	return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
-	int r = 0;
-
-	stasis_message_router_unsubscribe_and_join(channel_router);
-	channel_router = NULL;
-
-	stasis_message_router_unsubscribe_and_join(bridge_router);
-	bridge_router = NULL;
-
 	ao2_cleanup(apps_registry);
 	apps_registry = NULL;
 
@@ -1304,7 +835,7 @@
 	ao2_cleanup(app_bridges_moh);
 	app_bridges_moh = NULL;
 
-	return r;
+	return 0;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",

Modified: trunk/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/stasis/app.c?view=diff&rev=397820&r1=397819&r2=397820
==============================================================================
--- trunk/res/stasis/app.c (original)
+++ trunk/res/stasis/app.c Tue Aug 27 14:19:36 2013
@@ -29,50 +29,459 @@
 
 #include "app.h"
 
+#include "asterisk/callerid.h"
 #include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances.  Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
 
 struct app {
+	/*! Aggregation topic for this application. */
+	struct stasis_topic *topic;
+	/*! Router for handling messages forwarded to \a topic. */
+	struct stasis_message_router *router;
+	/*! Subscription to watch for bridge merge messages */
+	struct stasis_subscription *bridge_merge_sub;
+	/*! Container of the channel forwards to this app's topic. */
+	struct ao2_container *forwards;
 	/*! Callback function for this application. */
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
 	void *data;
-	/*! List of channel identifiers this app instance is interested in */
-	struct ao2_container *channels;
-	/*! List of bridge identifiers this app instance owns */
-	struct ao2_container *bridges;
 	/*! Name of the Stasis application */
 	char name[];
 };
 
+/*! Subscription info for a particular channel/bridge. */
+struct app_forwards {
+	/*! Count of number of times this channel/bridge has been subscribed */
+	int interested;
+
+	/*! Forward for the regular topic */
+	struct stasis_subscription *topic_forward;
+	/*! Forward for the caching topic */
+	struct stasis_subscription *topic_cached_forward;
+
+	/*! Unique id of the object being forwarded */
+	char id[];
+};
+
+static void forwards_dtor(void *obj)
+{
+	struct app_forwards *forwards = obj;
+
+	ast_assert(forwards->topic_forward == NULL);
+	ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+	stasis_unsubscribe(forwards->topic_forward);
+	forwards->topic_forward = NULL;
+	stasis_unsubscribe(forwards->topic_cached_forward);
+	forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+	const char *id)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || ast_strlen_zero(id)) {
+		return NULL;
+	}
+
+	forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+	if (!forwards) {
+		return NULL;
+	}
+
+	strcpy(forwards->id, id);
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+/*! Forward a channel's topics to an app */
+static struct app_forwards *forwards_create_channel(struct app *app,
+	struct ast_channel *chan)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !chan) {
+		return NULL;
+	}
+
+	forwards = forwards_create(app, ast_channel_uniqueid(chan));
+	if (!forwards) {
+		return NULL;
+	}
+
+	forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+		app->topic);
+	if (!forwards->topic_forward) {
+		return NULL;
+	}
+
+	forwards->topic_cached_forward = stasis_forward_all(
+		ast_channel_topic_cached(chan), app->topic);
+	if (!forwards->topic_cached_forward) {
+		/* Half-subscribed is a bad thing */
+		stasis_unsubscribe(forwards->topic_forward);
+		forwards->topic_forward = NULL;
+		return NULL;
+	}
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+/*! Forward a bridge's topics to an app */
+static struct app_forwards *forwards_create_bridge(struct app *app,
+	struct ast_bridge *bridge)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !bridge) {
+		return NULL;
+	}
+
+	forwards = forwards_create(app, bridge->uniqueid);
+	if (!forwards) {
+		return NULL;
+	}
+
+	forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+		app->topic);
+	if (!forwards->topic_forward) {
+		return NULL;
+	}
+
+	forwards->topic_cached_forward = stasis_forward_all(
+		ast_bridge_topic_cached(bridge), app->topic);
+	if (!forwards->topic_cached_forward) {
+		/* Half-subscribed is a bad thing */
+		stasis_unsubscribe(forwards->topic_forward);
+		forwards->topic_forward = NULL;
+		return NULL;
+	}
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+    const struct app_forwards *object_left = obj_left;
+    const struct app_forwards *object_right = obj_right;
+    const char *right_key = obj_right;
+    int cmp;
+
+    switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+    case OBJ_POINTER:
+        right_key = object_right->id;
+        /* Fall through */
+    case OBJ_KEY:
+        cmp = strcmp(object_left->id, right_key);
+        break;
+    case OBJ_PARTIAL_KEY:
+        /*
+         * We could also use a partial key struct containing a length
+         * so strlen() does not get called for every comparison instead.
+         */
+        cmp = strncmp(object_left->id, right_key, strlen(right_key));
+        break;
+    default:
+        /* Sort can only work on something with a full or partial key. */
+        ast_assert(0);
+        cmp = 0;
+        break;
+    }
+    return cmp;
+}
+
 static void app_dtor(void *obj)
 {
 	struct app *app = obj;
 
+	ast_verb(1, "Destroying Stasis app %s\n", app->name);
+
+	ast_assert(app->router == NULL);
+	ast_assert(app->bridge_merge_sub == NULL);
+
+	ao2_cleanup(app->topic);
+	app->topic = NULL;
+	ao2_cleanup(app->forwards);
+	app->forwards = NULL;
 	ao2_cleanup(app->data);
 	app->data = NULL;
-	ao2_cleanup(app->channels);
-	app->channels = NULL;
-	ao2_cleanup(app->bridges);
-	app->bridges = NULL;
+}
+
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+	struct stasis_topic *topic, struct stasis_message *message)
+{
+	struct app *app = data;
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	if (stasis_subscription_final_message(sub, message)) {
+		ao2_cleanup(app);
+	}
+
+	/* By default, send any message that has a JSON representation */
+	json = stasis_message_to_json(message);
+	if (!json) {
+		return;
+	}
+
+	app_send(app, json);
+}
+
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*channel_snapshot_monitor)(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot,
+	const struct timeval *tv);
+
+static struct ast_json *simple_channel_event(
+	const char *type,
+	struct ast_channel_snapshot *snapshot,
+	const struct timeval *tv)
+{
+	return ast_json_pack("{s: s, s: o, s: o}",
+		"type", type,
+		"timestamp", ast_json_timeval(*tv, NULL),
+		"channel", ast_channel_snapshot_to_json(snapshot));
+}
+
+static struct ast_json *channel_created_event(
+	struct ast_channel_snapshot *snapshot,
+	const struct timeval *tv)
+{

[... 693 lines stripped ...]



More information about the asterisk-commits mailing list