[asterisk-commits] kmoore: branch kmoore/stasis-bridge_events r384984 - in /team/kmoore/stasis-b...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Apr 8 12:36:27 CDT 2013


Author: kmoore
Date: Mon Apr  8 12:36:23 2013
New Revision: 384984

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384984
Log:
Multiple revisions 384947,384949-384950,384953,384983

........
  r384947 | kmoore | 2013-04-08 11:07:43 -0500 (Mon, 08 Apr 2013) | 2 lines
  
  Create rebase of stasis-channel_events on top of bridge_construction
........
  r384949 | kmoore | 2013-04-08 11:11:30 -0500 (Mon, 08 Apr 2013) | 2 lines
  
  svnmerge and automerge init
........
  r384950 | kmoore | 2013-04-08 11:12:23 -0500 (Mon, 08 Apr 2013) | 1 line
  
  commit stasis-channel_events work up to this point
........
  r384953 | root | 2013-04-08 11:18:02 -0500 (Mon, 08 Apr 2013) | 1 line
  
  automerge cancel
........
  r384983 | kmoore | 2013-04-08 12:34:35 -0500 (Mon, 08 Apr 2013) | 2 lines
  
  Pull in something I missed
........

Merged revisions 384947,384949-384950,384953,384983 from http://svn.asterisk.org/svn/asterisk/team/kmoore/stasis-bridging-channel_events

Modified:
    team/kmoore/stasis-bridge_events/   (props changed)
    team/kmoore/stasis-bridge_events/apps/app_stasis.c
    team/kmoore/stasis-bridge_events/res/res_stasis_websocket.c

Propchange: team/kmoore/stasis-bridge_events/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Apr  8 12:36:23 2013
@@ -1,1 +1,1 @@
-/team/kmoore/stasis-bridging-channel_events:1-384939
+/team/kmoore/stasis-bridging-channel_events:1-384983

Modified: team/kmoore/stasis-bridge_events/apps/app_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/apps/app_stasis.c?view=diff&rev=384984&r1=384983&r2=384984
==============================================================================
--- team/kmoore/stasis-bridge_events/apps/app_stasis.c (original)
+++ team/kmoore/stasis-bridge_events/apps/app_stasis.c Mon Apr  8 12:36:23 2013
@@ -37,7 +37,9 @@
 #include "asterisk/channel.h"
 #include "asterisk/module.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/strings.h"
+#include "asterisk/callerid.h"
 
 /*** DOCUMENTATION
 	<application name="Stasis" language="en_US">
@@ -61,6 +63,9 @@
 /*! \brief Maximum number of arguments for the Stasis dialplan application */
 #define MAX_ARGS 128
 
+/*! \brief Number of buckets for the channels container for app instances */
+#define APP_CHANNELS_BUCKETS 7
+
 /*! \brief Dialplan application name */
 static const char *stasis = "Stasis";
 
@@ -83,6 +88,8 @@
 struct ao2_container *__apps_registry;
 
 struct ao2_container *__app_controls;
+
+struct stasis_message_router *app_channel_router;
 
 /*! Ref-counting accessor for the stasis applications container */
 static struct ao2_container *apps_registry(void)
@@ -102,6 +109,8 @@
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
 	void *data;
+	/*! List of channel identifiers this app instance is interested in */
+	struct ao2_container *channels;
 	/*! Name of the Stasis application */
 	char name[];
 };
@@ -112,12 +121,14 @@
 
 	ao2_cleanup(app->data);
 	app->data = NULL;
+	ao2_cleanup(app->channels);
+	app->channels = NULL;
 }
 
 /*! Constructor for \ref app. */
 static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
-	struct app *app;
+	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	size_t size;
 
 	ast_assert(name != NULL);
@@ -135,6 +146,12 @@
 	ao2_ref(data, +1);
 	app->data = data;
 
+	app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
+	if (!app->channels) {
+		return NULL;
+	}
+
+	ao2_ref(app, +1);
 	return app;
 }
 
@@ -159,6 +176,34 @@
 	} else {
 		return 0;
 	}
+}
+
+static int app_add_channel(struct app* app, const struct ast_channel *chan)
+{
+	RAII_VAR(char *, ao2_channel_id, NULL, ao2_cleanup);
+	const char *uniqueid;
+	ast_assert(chan != NULL);
+	ast_assert(app != NULL);
+
+	uniqueid = ast_channel_uniqueid(chan);
+
+	ao2_channel_id = ao2_alloc(strlen(uniqueid) + 1, NULL);
+	if (!ao2_channel_id) {
+		return -1;
+	}
+
+	/* safe strcpy */
+	strcpy(ao2_channel_id, uniqueid);
+	ao2_link(app->channels, ao2_channel_id);
+	return 0;
+}
+
+static void app_remove_channel(struct app* app, const struct ast_channel *chan)
+{
+	ast_assert(chan != NULL);
+	ast_assert(app != NULL);
+
+	ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
 }
 
 /*!
@@ -331,25 +376,249 @@
 	return 0;
 }
 
-static void sub_handler(void *data, struct stasis_subscription *sub,
-			struct stasis_topic *topic,
-			struct stasis_message *message)
-{
-	struct app *app = data;
-	if (ast_channel_snapshot_type() == stasis_message_type(message)) {
+static int app_watching_channel_cb(void *obj, void *arg, int flags)
+{
+	RAII_VAR(char *, uniqueid, NULL, ao2_cleanup);
+	struct app *app = obj;
+	struct ast_channel_snapshot *snapshot = arg;
+
+	uniqueid = ao2_find(app->channels, snapshot->uniqueid, OBJ_KEY);
+	return uniqueid ? CMP_MATCH : 0;
+}
+
+static struct ao2_container *get_watching_apps(struct ast_channel_snapshot *snapshot)
+{
+	RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
+	struct ao2_container *watching_apps;
+	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
+	ast_assert(snapshot != NULL);
+	ast_assert(apps != NULL);
+
+	watching_apps_iter = ao2_callback(apps, OBJ_MULTIPLE, app_watching_channel_cb, snapshot);
+	watching_apps = watching_apps_iter->c;
+
+	if (!ao2_container_count(watching_apps)) {
+		return NULL;
+	}
+
+	ao2_ref(watching_apps, +1);
+	return watching_apps_iter->c;
+}
+
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*snapshot_monitor)(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot);
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+	char *event_name = NULL;
+
+	if (!old_snapshot) {
+		event_name = "channel-event-create";
+	} else if (!new_snapshot) {
+		event_name = "channel-event-destroy";
+		json = ast_json_pack("{s: i, s: s}",
+			"Cause", old_snapshot->hangupcause,
+			"Cause-txt", ast_cause2str(old_snapshot->hangupcause));
+		if (!json) {
+			return NULL;
+		}
+	} else if (old_snapshot->state != new_snapshot->state) {
+		event_name = "channel-event-state";
+	}
+
+	if (!event_name) {
+		return NULL;
+	}
+
+	return app_event_create(event_name, new_snapshot ? new_snapshot : old_snapshot, json);
+}
+
+/*!
+ * \brief Compares the context, exten and priority of two snapshots.
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ * \return True (non-zero) if context, exten or priority are identical.
+ * \return False (zero) if context, exten and priority changed.
+ */
+static inline int cep_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot)
+{
+	ast_assert(old_snapshot != NULL);
+	ast_assert(new_snapshot != NULL);
+
+	/* We actually get some snapshots with CEP set, but before the
+	 * application is set. Since empty application is invalid, we treat
+	 * setting the application from nothing as a CEP change.
+	 */
+	if (ast_strlen_zero(old_snapshot->appl) &&
+	    !ast_strlen_zero(new_snapshot->appl)) {
+		return 0;
+	}
+
+	return old_snapshot->priority == new_snapshot->priority &&
+		strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
+		strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
+}
+
+static struct ast_json *channel_dialplan(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	/* No Newexten event on cache clear */
+	if (!new_snapshot) {
+		return NULL;
+	}
+
+	/* Empty application is not valid for a Newexten event */
+	if (ast_strlen_zero(new_snapshot->appl)) {
+		return NULL;
+	}
+
+	if (old_snapshot && cep_equal(old_snapshot, new_snapshot)) {
+		return NULL;
+	}
+
+	json = ast_json_pack("{s: s, s: s}",
+		"Application", new_snapshot->appl,
+		"ApplicationData", new_snapshot->data);
+	if (!json) {
+		return NULL;
+	}
+
+	return app_event_create("channel-event-dialplan", new_snapshot, json);
+}
+
+/*!
+ * \brief Compares the callerid info of two snapshots.
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ * \return True (non-zero) if callerid are identical.
+ * \return False (zero) if callerid changed.
+ */
+static inline int caller_id_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot)
+{
+	ast_assert(old_snapshot != NULL);
+	ast_assert(new_snapshot != NULL);
+	return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
+		strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
+}
+
+static struct ast_json *channel_callerid(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	/* No NewCallerid event on cache clear or first event */
+	if (!old_snapshot || !new_snapshot) {
+		return NULL;
+	}
+
+	if (caller_id_equal(old_snapshot, new_snapshot)) {
+		return NULL;
+	}
+
+	json = ast_json_pack("{s: i, s: s}",
+		"CallerPresentation", new_snapshot->caller_pres,
+		"CallerPresentation-txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
+	if (!json) {
+		return NULL;
+	}
+
+	return app_event_create("channel-event-callerid", new_snapshot, json);
+}
+
+static struct ast_json *channel_snapshot(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	if (!new_snapshot) {
+		return NULL;
+	}
+
+	return app_event_create("channel-snapshot", new_snapshot, NULL);
+}
+
+snapshot_monitor monitors[] = {
+	channel_snapshot,
+	channel_state,
+	channel_dialplan,
+	channel_callerid
+};
+
+static int app_send_cb(void *obj, void *arg, int flags)
+{
+	struct app *app = obj;
+	struct ast_json *msg = arg;
+
+	app_send(app, msg);
+	return 0;
+}
+
+static void sub_snapshot_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+	struct stasis_cache_update *update = stasis_message_data(message);
+	struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
+	struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+	int i;
+
+	watching_apps = get_watching_apps(new_snapshot ? new_snapshot : old_snapshot);
+	if (!watching_apps) {
+		return;
+	}
+
+	for (i = 0; i < ARRAY_LEN(monitors); ++i) {
 		RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-		struct ast_channel_snapshot *snapshot =
-			stasis_message_data(message);
-
-		msg = app_event_create("channel-state-change", snapshot, NULL);
-		if (!msg) {
-			return;
+
+		msg = monitors[i](old_snapshot, new_snapshot);
+		if (msg) {
+			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
 		}
-		app_send(app, msg);
-	}
-	if (stasis_subscription_final_message(sub, message)) {
-		ao2_cleanup(data);
-	}
+	}
+}
+
+static void sub_blob_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+	RAII_VAR(struct ast_str *, event_name, ast_str_create(32), ast_free);
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+	struct ast_channel_blob *obj = stasis_message_data(message);
+
+	if (!obj->snapshot) {
+		return;
+	}
+
+	watching_apps = get_watching_apps(obj->snapshot);
+	if (!watching_apps) {
+		return;
+	}
+
+	ast_str_set(&event_name, 0, "channel-event-%s", ast_channel_blob_json_type(obj));
+
+	msg = app_event_create(ast_str_buffer(event_name), obj->snapshot, obj->blob);
+	if (!msg) {
+		return;
+	}
+
+	ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
 }
 
 /*!
@@ -375,7 +644,6 @@
 	RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL, stasis_unsubscribe);
 	int res = 0;
 	char *parse = NULL;
 	int hungup = 0;
@@ -415,17 +683,15 @@
 		ao2_link(controls, control);
 	}
 
-	subscription = stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
-	if (subscription == NULL) {
-		ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n", args.app_name, ast_channel_name(chan));
-		return -1;
-	}
-	ao2_ref(app, +1); /* subscription now has a reference */
-
 	res = send_start_msg(app, chan, args.argc - 1, args.app_argv);
 	if (res != 0) {
 		ast_log(LOG_ERROR, "Error sending start message to %s\n", args.app_name);
 		return res;
+	}
+
+	if (app_add_channel(app, chan)) {
+		ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), args.app_name);
+		return -1;
 	}
 
 	while (!hungup && !control_continue_test_and_reset(control) && ast_waitfor(chan, -1) > -1) {
@@ -448,6 +714,7 @@
 		}
 	}
 
+	app_remove_channel(app, chan);
 	res = send_end_msg(app, chan);
 	if (res != 0) {
 		ast_log(LOG_ERROR, "Error sending end message to %s\n", args.app_name);
@@ -532,6 +799,14 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
+	app_channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
+	if (!app_channel_router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	r |= stasis_message_router_add(app_channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
+	r |= stasis_message_router_add(app_channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
+
 	r |= ast_register_application_xml(stasis, app_stasis_exec);
 	return r;
 }
@@ -539,6 +814,9 @@
 static int unload_module(void)
 {
 	int r = 0;
+
+	stasis_message_router_unsubscribe(app_channel_router);
+	app_channel_router = NULL;
 
 	ao2_cleanup(__apps_registry);
 	__apps_registry = NULL;

Modified: team/kmoore/stasis-bridge_events/res/res_stasis_websocket.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/res/res_stasis_websocket.c?view=diff&rev=384984&r1=384983&r2=384984
==============================================================================
--- team/kmoore/stasis-bridge_events/res/res_stasis_websocket.c (original)
+++ team/kmoore/stasis-bridge_events/res/res_stasis_websocket.c Mon Apr  8 12:36:23 2013
@@ -52,10 +52,6 @@
  */
 #define APPS_NUM_BUCKETS 7
 
-struct websocket_app {
-	char *name;
-};
-
 /*!
  * \internal
  * \brief Helper to write a JSON object to a WebSocket.
@@ -78,35 +74,6 @@
 				   strlen(str));
 }
 
-/*! Hash function for websocket_app */
-static int hash_app(const void *obj, const int flags)
-{
-	const struct websocket_app *app = obj;
-	const char *name = flags & OBJ_KEY ? obj : app->name;
-
-	return ast_str_hash(name);
-}
-
-/*! Comparison function for websocket_app */
-static int compare_app(void *lhs, void *rhs, int flags)
-{
-	const struct websocket_app *lhs_app = lhs;
-	const struct websocket_app *rhs_app = rhs;
-	const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
-
-	if (strcmp(lhs_app->name, rhs_name) == 0) {
-		return CMP_MATCH;
-	} else {
-		return 0;
-	}
-}
-
-static void app_dtor(void *obj)
-{
-	struct websocket_app *app = obj;
-	ast_free(app->name);
-}
-
 struct stasis_ws_session_info {
 	struct ast_websocket *ws_session;
 	struct ao2_container *websocket_apps;
@@ -130,7 +97,7 @@
 
 	session->ws_session = ws_session;
 	session->websocket_apps =
-		ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
+		ast_str_container_alloc(APPS_NUM_BUCKETS);
 
 	if (!session->websocket_apps) {
 		return NULL;
@@ -152,12 +119,12 @@
 static void session_shutdown(struct stasis_ws_session_info *session)
 {
         struct ao2_iterator i;
-	struct websocket_app *app;
+	char *app;
 	SCOPED_AO2LOCK(lock, session);
 
 	i = ao2_iterator_init(session->websocket_apps, 0);
 	while ((app = ao2_iterator_next(&i))) {
-		stasis_app_unregister(app->name);
+		stasis_app_unregister(app);
 		ao2_cleanup(app);
 	}
 	ao2_iterator_destroy(&i);
@@ -210,14 +177,16 @@
 		return -1;
 	}
 	while ((app_name = strsep(&apps, ","))) {
-		RAII_VAR(struct websocket_app *, app, NULL, ao2_cleanup);
-
-		app = ao2_alloc(sizeof(*app), app_dtor);
+		RAII_VAR(char *, app, NULL, ao2_cleanup);
+
+		app = ao2_alloc(strlen(app_name) + 1, NULL);
 		if (!app) {
 			websocket_write_json(session->ws_session, oom_json);
 			return -1;
 		}
-		app->name = ast_strdup(app_name);
+
+		/* safe strcpy */
+		strcpy(app, app_name);
 		ao2_link(session->websocket_apps, app);
 
 		stasis_app_register(app_name, app_handler, session);




More information about the asterisk-commits mailing list