[svn-commits] dlee: branch dlee/playback r388300 - in /team/dlee/playback: ./ apps/ include...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Fri May 10 09:09:16 CDT 2013


Author: dlee
Date: Fri May 10 09:09:09 2013
New Revision: 388300

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=388300
Log:
Add channel events for res_stasis apps

This change adds a framework in res_stasis for handling events from
channel topics. JSON event generation and validation code is created
from event documentation in rest-api/api-docs/events.json to assist in
JSON event generation, ensure consistency, and ensure that accurate
documentation is available for ALL events that are received by
res_stasis applications.

The userevent application has been refactored along with the code that
handles userevent channel blob events to pass the headers as key/value
pairs in the JSON blob. As a side-effect, app_userevent now handles
duplicate keys by overwriting the previous value.

Review: https://reviewboard.asterisk.org/r/2428/
(closes issue ASTERISK-21180)
Patch-By: Kinsey Moore <kmoore at digium.com>
........

Merged revisions 388275 from http://svn.asterisk.org/svn/asterisk/trunk

Added:
    team/dlee/playback/rest-api-templates/event_function_decl.mustache
      - copied unchanged from r388275, trunk/rest-api-templates/event_function_decl.mustache
Modified:
    team/dlee/playback/   (props changed)
    team/dlee/playback/CHANGES
    team/dlee/playback/apps/app_userevent.c
    team/dlee/playback/include/asterisk/stasis_channels.h
    team/dlee/playback/main/manager_channels.c
    team/dlee/playback/main/stasis_channels.c
    team/dlee/playback/res/res_stasis.c
    team/dlee/playback/res/res_stasis_http_events.c
    team/dlee/playback/res/res_stasis_websocket.c
    team/dlee/playback/res/stasis_http/resource_channels.h
    team/dlee/playback/res/stasis_http/resource_events.h
    team/dlee/playback/res/stasis_http/resource_recordings.h
    team/dlee/playback/res/stasis_http/resource_sounds.h
    team/dlee/playback/rest-api-templates/asterisk_processor.py
    team/dlee/playback/rest-api-templates/res_stasis_http_resource.c.mustache
    team/dlee/playback/rest-api-templates/stasis_http_resource.h.mustache
    team/dlee/playback/rest-api-templates/swagger_model.py
    team/dlee/playback/rest-api/api-docs/events.json

Propchange: team/dlee/playback/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri May 10 09:09:09 2013
@@ -1,1 +1,1 @@
-/trunk:1-388254
+/trunk:1-388299

Modified: team/dlee/playback/CHANGES
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/CHANGES?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/CHANGES (original)
+++ team/dlee/playback/CHANGES Fri May 10 09:09:09 2013
@@ -152,6 +152,12 @@
  * All future modules which utilize Sorcery for object persistence must have a
    column named "id" within their schema when using the Sorcery realtime module.
    This column must be able to contain a string of up to 128 characters in length.
+
+app_userevent
+------------------
+ * UserEvent will now handle duplicate keys by overwriting the previous value
+   assigned to the key. UserEvent invocations will also be distributed to any
+   interested res_stasis applications.
 
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 10 to Asterisk 11 --------------------

Modified: team/dlee/playback/apps/app_userevent.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/apps/app_userevent.c?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/apps/app_userevent.c (original)
+++ team/dlee/playback/apps/app_userevent.c Fri May 10 09:09:09 2013
@@ -39,23 +39,28 @@
 /*** DOCUMENTATION
 	<application name="UserEvent" language="en_US">
 		<synopsis>
-			Send an arbitrary event to the manager interface.
+			Send an arbitrary user-defined event to parties interested in a channel (AMI users and relevant res_stasis applications).
 		</synopsis>
 		<syntax>
 			<parameter name="eventname" required="true" />
 			<parameter name="body" />
 		</syntax>
 		<description>
-			<para>Sends an arbitrary event to the manager interface, with an optional
+			<para>Sends an arbitrary event to interested parties, with an optional
 			<replaceable>body</replaceable> representing additional arguments. The
 			<replaceable>body</replaceable> may be specified as
-			a <literal>,</literal> delimited list of headers. Each additional
-			argument will be placed on a new line in the event. The format of the
-			event will be:</para>
+			a <literal>,</literal> delimited list of key:value pairs.</para>
+			<para>For AMI, each additional argument will be placed on a new line in
+			the event and the format of the event will be:</para>
 			<para>    Event: UserEvent</para>
 			<para>    UserEvent: &lt;specified event name&gt;</para>
 			<para>    [body]</para>
-			<para>If no <replaceable>body</replaceable> is specified, only Event and UserEvent headers will be present.</para>
+			<para>If no <replaceable>body</replaceable> is specified, only Event and
+			UserEvent headers will be present.</para>
+			<para>For res_stasis applications, the event will be provided as a JSON
+			blob with additional arguments appearing as keys in the object and the
+			<replaceable>eventname</replaceable> under the
+			<literal>eventname</literal> key.</para>
 		</description>
 	</application>
  ***/
@@ -70,7 +75,6 @@
 		AST_APP_ARG(eventname);
 		AST_APP_ARG(extra)[100];
 	);
-	RAII_VAR(struct ast_str *, body, ast_str_create(16), ast_free);
 	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
@@ -79,25 +83,37 @@
 		return -1;
 	}
 
-	if (!body) {
-		ast_log(LOG_WARNING, "Unable to allocate buffer\n");
-		return -1;
-	}
-
 	parse = ast_strdupa(data);
 
 	AST_STANDARD_APP_ARGS(args, parse);
 
-	for (x = 0; x < args.argc - 1; x++) {
-		ast_str_append(&body, 0, "%s\r\n", args.extra[x]);
+	blob = ast_json_pack("{s: s, s: s}",
+			     "type", "userevent",
+			     "eventname", args.eventname);
+	if (!blob) {
+		return -1;
 	}
 
-	blob = ast_json_pack("{s: s, s: s}",
-			     "eventname", args.eventname,
-			     "body", ast_str_buffer(body));
-	if (!blob) {
-		ast_log(LOG_WARNING, "Unable to create message buffer\n");
-		return -1;
+	for (x = 0; x < args.argc - 1; x++) {
+		char *key, *value = args.extra[x];
+		struct ast_json *json_value;
+
+		key = strsep(&value, ":");
+		if (!value) {
+			/* no ':' in string? */
+			continue;
+		}
+
+		value = ast_strip(value);
+		json_value = ast_json_string_create(value);
+		if (!json_value) {
+			return -1;
+		}
+
+		/* ref stolen by ast_json_object_set */
+		if (ast_json_object_set(blob, key, json_value)) {
+			return -1;
+		}
 	}
 
 	msg = ast_channel_blob_create(

Modified: team/dlee/playback/include/asterisk/stasis_channels.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/include/asterisk/stasis_channels.h?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/include/asterisk/stasis_channels.h (original)
+++ team/dlee/playback/include/asterisk/stasis_channels.h Fri May 10 09:09:09 2013
@@ -344,6 +344,34 @@
 struct ast_json *ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot);
 
 /*!
+ * \brief Compares the context, exten and priority of two snapshots.
+ * \since 12
+ *
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ *
+ * \return True (non-zero) if context, exten or priority are identical.
+ * \return False (zero) if context, exten and priority changed.
+ */
+int ast_channel_snapshot_cep_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot);
+
+/*!
+ * \brief Compares the callerid info of two snapshots.
+ * \since 12
+ *
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ *
+ * \return True (non-zero) if callerid are identical.
+ * \return False (zero) if callerid changed.
+ */
+int ast_channel_snapshot_caller_id_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot);
+
+/*!
  * \brief Dispose of the stasis channel topics and message types
  */
 void ast_stasis_channels_shutdown(void);

Modified: team/dlee/playback/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/main/manager_channels.c?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/main/manager_channels.c (original)
+++ team/dlee/playback/main/manager_channels.c Fri May 10 09:09:09 2013
@@ -402,34 +402,6 @@
 	return NULL;
 }
 
-/*!
- * \brief Compares the context, exten and priority of two snapshots.
- * \param old_snapshot Old snapshot
- * \param new_snapshot New snapshot
- * \return True (non-zero) if context, exten or priority are identical.
- * \return False (zero) if context, exten and priority changed.
- */
-static inline int cep_equal(
-	const struct ast_channel_snapshot *old_snapshot,
-	const struct ast_channel_snapshot *new_snapshot)
-{
-	ast_assert(old_snapshot != NULL);
-	ast_assert(new_snapshot != NULL);
-
-	/* We actually get some snapshots with CEP set, but before the
-	 * application is set. Since empty application is invalid, we treat
-	 * setting the application from nothing as a CEP change.
-	 */
-	if (ast_strlen_zero(old_snapshot->appl) &&
-	    !ast_strlen_zero(new_snapshot->appl)) {
-		return 0;
-	}
-
-	return old_snapshot->priority == new_snapshot->priority &&
-		strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
-		strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
-}
-
 static struct snapshot_manager_event *channel_newexten(
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
@@ -444,7 +416,7 @@
 		return NULL;
 	}
 
-	if (old_snapshot && cep_equal(old_snapshot, new_snapshot)) {
+	if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
 		return NULL;
 	}
 
@@ -459,23 +431,6 @@
 		new_snapshot->data);
 }
 
-/*!
- * \brief Compares the callerid info of two snapshots.
- * \param old_snapshot Old snapshot
- * \param new_snapshot New snapshot
- * \return True (non-zero) if callerid are identical.
- * \return False (zero) if callerid changed.
- */
-static inline int caller_id_equal(
-	const struct ast_channel_snapshot *old_snapshot,
-	const struct ast_channel_snapshot *new_snapshot)
-{
-	ast_assert(old_snapshot != NULL);
-	ast_assert(new_snapshot != NULL);
-	return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
-		strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
-}
-
 static struct snapshot_manager_event *channel_new_callerid(
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
@@ -485,7 +440,7 @@
 		return NULL;
 	}
 
-	if (caller_id_equal(old_snapshot, new_snapshot)) {
+	if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
 		return NULL;
 	}
 
@@ -587,19 +542,62 @@
 		      variable, value);
 }
 
+/*!
+ * \brief Callback used to determine whether a key should be skipped when converting a JSON object to a manager blob
+ * \param key Key from JSON blob to be evaluated
+ * \retval non-zero if the key should be excluded
+ * \retval zero if the key should not be excluded
+ */
+typedef int (*key_exclusion_cb)(const char *key);
+
+static struct ast_str *manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb)
+{
+	struct ast_str *output_str = ast_str_create(32);
+	struct ast_json_iter *blob_iter = ast_json_object_iter(blob);
+	if (!output_str || !blob_iter) {
+		return NULL;
+	}
+
+	do {
+		const char *key = ast_json_object_iter_key(blob_iter);
+		const char *value = ast_json_string_get(ast_json_object_iter_value(blob_iter));
+		if (exclusion_cb && exclusion_cb(key)) {
+			continue;
+		}
+
+		ast_str_append(&output_str, 0, "%s: %s\r\n", key, value);
+		if (!output_str) {
+			return NULL;
+		}
+	} while ((blob_iter = ast_json_object_iter_next(blob, blob_iter)));
+
+	return output_str;
+}
+
+static int userevent_exclusion_cb(const char *key)
+{
+	if (!strcmp("type", key)) {
+		return 1;
+	}
+	if (!strcmp("eventname", key)) {
+		return 1;
+	}
+	return 0;
+}
+
 static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
 	struct stasis_topic *topic, struct stasis_message *message)
 {
 	struct ast_channel_blob *obj = stasis_message_data(message);
 	RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
+	RAII_VAR(struct ast_str *, body, NULL, ast_free);
 	const char *eventname;
-	const char *body;
 
 	eventname = ast_json_string_get(ast_json_object_get(obj->blob, "eventname"));
-	body = ast_json_string_get(ast_json_object_get(obj->blob, "body"));
+	body = manager_str_from_json_object(obj->blob, userevent_exclusion_cb);
 	channel_event_string = ast_manager_build_channel_state_string(obj->snapshot);
 
-	if (!channel_event_string) {
+	if (!channel_event_string || !body) {
 		return;
 	}
 
@@ -621,7 +619,7 @@
 		      "%s"
 		      "UserEvent: %s\r\n"
 		      "%s",
-		      ast_str_buffer(channel_event_string), eventname, body);
+		      ast_str_buffer(channel_event_string), eventname, ast_str_buffer(body));
 }
 
 static void channel_hangup_request_cb(void *data,

Modified: team/dlee/playback/main/stasis_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/main/stasis_channels.c?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/main/stasis_channels.c (original)
+++ team/dlee/playback/main/stasis_channels.c Fri May 10 09:09:09 2013
@@ -520,6 +520,37 @@
 	return ast_json_ref(json_chan);
 }
 
+int ast_channel_snapshot_cep_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot)
+{
+	ast_assert(old_snapshot != NULL);
+	ast_assert(new_snapshot != NULL);
+
+	/* We actually get some snapshots with CEP set, but before the
+	 * application is set. Since empty application is invalid, we treat
+	 * setting the application from nothing as a CEP change.
+	 */
+	if (ast_strlen_zero(old_snapshot->appl) &&
+	    !ast_strlen_zero(new_snapshot->appl)) {
+		return 0;
+	}
+
+	return old_snapshot->priority == new_snapshot->priority &&
+		strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
+		strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
+}
+
+int ast_channel_snapshot_caller_id_equal(
+	const struct ast_channel_snapshot *old_snapshot,
+	const struct ast_channel_snapshot *new_snapshot)
+{
+	ast_assert(old_snapshot != NULL);
+	ast_assert(new_snapshot != NULL);
+	return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
+		strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
+}
+
 void ast_stasis_channels_shutdown(void)
 {
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);

Modified: team/dlee/playback/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/res/res_stasis.c?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/res/res_stasis.c (original)
+++ team/dlee/playback/res/res_stasis.c Fri May 10 09:09:09 2013
@@ -56,12 +56,15 @@
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #include "asterisk/astobj2.h"
+#include "asterisk/callerid.h"
 #include "asterisk/module.h"
 #include "asterisk/stasis_app_impl.h"
 #include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/strings.h"
 #include "stasis/app.h"
 #include "stasis/control.h"
+#include "stasis_http/resource_events.h"
 
 #include "stasis/command.h"
 
@@ -80,15 +83,35 @@
  */
 #define CONTROLS_NUM_BUCKETS 127
 
+/*!
+ * \brief Number of buckets for the channels container for app instances.  Remember
+ * to keep it a prime number!
+ */
+#define APP_CHANNELS_BUCKETS 7
+
+/*!
+ * \brief Number of buckets for the blob_handlers container.  Remember to keep
+ * it a prime number!
+ */
+#define BLOB_HANDLER_BUCKETS 7
+
+/*!
+ * \brief Stasis application container.
+ */
 struct ao2_container *apps_registry;
 
 struct ao2_container *app_controls;
+
+/*! \brief Message router for the channel caching topic */
+struct stasis_message_router *channel_router;
 
 struct app {
 	/*! Callback function for this application. */
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
 	void *data;
+	/*! List of channel identifiers this app instance is interested in */
+	struct ao2_container *channels;
 	/*! Name of the Stasis application */
 	char name[];
 };
@@ -99,11 +122,13 @@
 
 	ao2_cleanup(app->data);
 	app->data = NULL;
+	ao2_cleanup(app->channels);
+	app->channels = NULL;
 }
 
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
-	struct app *app;
+	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	size_t size;
 
 	ast_assert(name != NULL);
@@ -121,6 +146,12 @@
 	ao2_ref(data, +1);
 	app->data = data;
 
+	app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
+	if (!app->channels) {
+		return NULL;
+	}
+
+	ao2_ref(app, +1);
 	return app;
 }
 
@@ -146,6 +177,27 @@
 	} else {
 		return 0;
 	}
+}
+
+static int app_add_channel(struct app* app, const struct ast_channel *chan)
+{
+	const char *uniqueid;
+	ast_assert(chan != NULL);
+	ast_assert(app != NULL);
+
+	uniqueid = ast_channel_uniqueid(chan);
+	if (!ast_str_container_add(app->channels, uniqueid)) {
+		return -1;
+	}
+	return 0;
+}
+
+static void app_remove_channel(struct app* app, const struct ast_channel *chan)
+{
+	ast_assert(chan != NULL);
+	ast_assert(app != NULL);
+
+	ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
 }
 
 /*!
@@ -333,6 +385,9 @@
 	stasis_app_send_command_async(control, app_control_continue, NULL);
 }
 
+/*! \brief Typedef for blob handler callbacks */
+typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
+
 static int OK = 0;
 static int FAIL = -1;
 
@@ -364,43 +419,11 @@
 	return 0;
 }
 
-static struct ast_json *event_create(
-	const char *event_name,
-	const struct ast_channel_snapshot *snapshot,
-	const struct ast_json *extra_info)
-{
-	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
-	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
-
-	if (extra_info) {
-		event = ast_json_deep_copy(extra_info);
-	} else {
-		event = ast_json_object_create();
-	}
-
-	if (snapshot) {
-		int ret;
-
-		/* Mustn't already have a channel field */
-		ast_assert(ast_json_object_get(event, "channel") == NULL);
-
-		ret = ast_json_object_set(
-			event,
-			"channel", ast_channel_snapshot_to_json(snapshot));
-		if (ret != 0) {
-			return NULL;
-		}
-	}
-
-	message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
-
-	return ast_json_ref(message);
-}
-
 int app_send_start_msg(struct app *app, struct ast_channel *chan,
 	int argc, char *argv[])
 {
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
 	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
 
 	struct ast_json *json_args;
@@ -414,19 +437,13 @@
 		return -1;
 	}
 
-	msg = ast_json_pack("{s: {s: [], s: o}}",
-			    "stasis-start",
-			    "args",
-			    "channel", ast_channel_snapshot_to_json(snapshot));
-
-	if (!msg) {
+	blob = ast_json_pack("{s: []}", "args");
+	if (!blob) {
 		return -1;
 	}
 
 	/* Append arguments to args array */
-	json_args = ast_json_object_get(
-		ast_json_object_get(msg, "stasis-start"),
-		"args");
+	json_args = ast_json_object_get(blob, "args");
 	ast_assert(json_args != NULL);
 	for (i = 0; i < argc; ++i) {
 		int r = ast_json_array_append(json_args,
@@ -437,6 +454,11 @@
 		}
 	}
 
+	msg = stasis_json_event_stasis_start_create(snapshot, blob);
+	if (!msg) {
+		return -1;
+	}
+
 	app_send(app, msg);
 	return 0;
 }
@@ -453,7 +475,8 @@
 	if (snapshot == NULL) {
 		return -1;
 	}
-	msg = event_create("stasis-end", snapshot, NULL);
+
+	msg = stasis_json_event_stasis_end_create(snapshot);
 	if (!msg) {
 		return -1;
 	}
@@ -462,69 +485,200 @@
 	return 0;
 }
 
-static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
-{
-	RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
+static int app_watching_channel_cb(void *obj, void *arg, int flags)
+{
+	RAII_VAR(char *, uniqueid, NULL, ao2_cleanup);
+	struct app *app = obj;
+	char *chan_uniqueid = arg;
+
+	uniqueid = ao2_find(app->channels, chan_uniqueid, OBJ_KEY);
+	return uniqueid ? CMP_MATCH : 0;
+}
+
+static struct ao2_container *get_watching_apps(const char *uniqueid)
+{
+	struct ao2_container *watching_apps;
+	char *uniqueid_dup;
+	RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
+	ast_assert(uniqueid != NULL);
+
+	uniqueid_dup = ast_strdupa(uniqueid);
+
+	watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
+	watching_apps = watching_apps_iter->c;
+
+	if (!ao2_container_count(watching_apps)) {
+		return NULL;
+	}
+
+	ao2_ref(watching_apps, +1);
+	return watching_apps_iter->c;
+}
+
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*channel_snapshot_monitor)(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot);
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+	struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot;
+
+	if (!old_snapshot) {
+		return stasis_json_event_channel_created_create(snapshot);
+	} else if (!new_snapshot) {
+		json = ast_json_pack("{s: i, s: s}",
+			"cause", snapshot->hangupcause,
+			"cause_txt", ast_cause2str(snapshot->hangupcause));
+		if (!json) {
+			return NULL;
+		}
+		return stasis_json_event_channel_destroyed_create(snapshot, json);
+	} else if (old_snapshot->state != new_snapshot->state) {
+		return stasis_json_event_channel_state_change_create(snapshot);
+	}
+
+	return NULL;
+}
+
+static struct ast_json *channel_dialplan(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	/* No Newexten event on cache clear */
+	if (!new_snapshot) {
+		return NULL;
+	}
+
+	/* Empty application is not valid for a Newexten event */
+	if (ast_strlen_zero(new_snapshot->appl)) {
+		return NULL;
+	}
+
+	if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
+		return NULL;
+	}
+
+	json = ast_json_pack("{s: s, s: s}",
+		"application", new_snapshot->appl,
+		"application_data", new_snapshot->data);
+	if (!json) {
+		return NULL;
+	}
+
+	return stasis_json_event_channel_dialplan_create(new_snapshot, json);
+}
+
+static struct ast_json *channel_callerid(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	/* No NewCallerid event on cache clear or first event */
+	if (!old_snapshot || !new_snapshot) {
+		return NULL;
+	}
+
+	if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
+		return NULL;
+	}
+
+	json = ast_json_pack("{s: i, s: s}",
+		"caller_presentation", new_snapshot->caller_pres,
+		"caller_presentation_txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
+	if (!json) {
+		return NULL;
+	}
+
+	return stasis_json_event_channel_caller_id_create(new_snapshot, json);
+}
+
+static struct ast_json *channel_snapshot(
+	struct ast_channel_snapshot *old_snapshot,
+	struct ast_channel_snapshot *new_snapshot)
+{
+	if (!new_snapshot) {
+		return NULL;
+	}
+
+	return stasis_json_event_channel_snapshot_create(new_snapshot);
+}
+
+channel_snapshot_monitor channel_monitors[] = {
+	channel_snapshot,
+	channel_state,
+	channel_dialplan,
+	channel_callerid
+};
+
+static int app_send_cb(void *obj, void *arg, int flags)
+{
+	struct app *app = obj;
+	struct ast_json *msg = arg;
+
+	app_send(app, msg);
+	return 0;
+}
+
+static void sub_snapshot_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+	struct stasis_cache_update *update = stasis_message_data(message);
+	struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
+	struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+	int i;
+
+	watching_apps = get_watching_apps(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
+	if (!watching_apps) {
+		return;
+	}
+
+	for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
+		RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+
+		msg = channel_monitors[i](old_snapshot, new_snapshot);
+		if (msg) {
+			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+		}
+	}
+}
+
+
+static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
+{
+	ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
+}
+
+static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_handler_cb handler_cb)
+{
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-	const char *direction;
-
-	/* To simplify events, we'll only generate on receive */
-	direction = ast_json_string_get(
-		ast_json_object_get(obj->blob, "direction"));
-
-	if (strcmp("Received", direction) != 0) {
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+
+	if (!obj->snapshot) {
 		return;
 	}
 
-	extra = ast_json_pack(
-		"{s: o}",
-		"digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
-	if (!extra) {
+	watching_apps = get_watching_apps(obj->snapshot->uniqueid);
+	if (!watching_apps) {
 		return;
 	}
 
-	msg = event_create("dtmf-received", obj->snapshot, extra);
+	msg = handler_cb(obj);
 	if (!msg) {
 		return;
 	}
 
-	app_send(app, msg);
-}
-
-static void sub_handler(void *data, struct stasis_subscription *sub,
-			struct stasis_topic *topic,
-			struct stasis_message *message)
-{
-	struct app *app = data;
-
-	if (stasis_subscription_final_message(sub, message)) {
-		ao2_cleanup(data);
-		return;
-	}
-
-	if (ast_channel_snapshot_type() == stasis_message_type(message)) {
-		RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-		struct ast_channel_snapshot *snapshot =
-			stasis_message_data(message);
-
-		msg = event_create("channel-state-change", snapshot, NULL);
-		if (!msg) {
-			return;
-		}
-		app_send(app, msg);
-	} else if (ast_channel_dtmf_end_type() == stasis_message_type(message)) {
-		struct ast_channel_blob *blob = stasis_message_data(message);
-		dtmf_handler(app, blob);
-	}
-}
-
-struct stasis_subscription *app_subscribe(struct app *app,
-	struct stasis_topic *topic)
-{
-	if (app == NULL || topic == NULL) {
-		return NULL;
-	}
-	return stasis_subscribe(topic, sub_handler, app);
+	distribute_message(watching_apps, msg);
 }
 
 /*!
@@ -550,8 +704,6 @@
 
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
-	RAII_VAR(struct stasis_subscription *, subscription, NULL,
-		 stasis_unsubscribe);
 	int res = 0;
 
 	ast_assert(chan != NULL);
@@ -569,20 +721,17 @@
 		return -1;
 	}
 	ao2_link(app_controls, control);
-
-	subscription = app_subscribe(app, ast_channel_topic(chan));
-	if (subscription == NULL) {
-		ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
-			app_name, ast_channel_name(chan));
-		return -1;
-	}
-	ao2_ref(app, +1); /* subscription now has a reference */
 
 	res = app_send_start_msg(app, chan, argc, argv);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
 			"Error sending start message to %s\n", app_name);
 		return res;
+	}
+
+	if (app_add_channel(app, chan)) {
+		ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+		return -1;
 	}
 
 	while (!control_is_done(control)) {
@@ -633,6 +782,7 @@
 		}
 	}
 
+	app_remove_channel(app, chan);
 	res = app_send_end_msg(app, chan);
 	if (res != 0) {
 		ast_log(LOG_ERROR,
@@ -693,6 +843,82 @@
 	}
 }
 
+static struct ast_json *handle_blob_dtmf(struct ast_channel_blob *obj)
+{
+	RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+	const char *direction;
+
+	/* To simplify events, we'll only generate on receive */
+	direction = ast_json_string_get(
+		ast_json_object_get(obj->blob, "direction"));
+
+	if (strcmp("Received", direction) != 0) {
+		return NULL;
+	}
+
+	extra = ast_json_pack(
+		"{s: o}",
+		"digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
+	if (!extra) {
+		return NULL;
+	}
+
+	return stasis_json_event_channel_dtmf_received_create(obj->snapshot, extra);
+}
+
+/* To simplify events, we'll only generate on DTMF end (dtmf_end type) */
+static void sub_dtmf_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	struct ast_channel_blob *obj = stasis_message_data(message);
+	generic_blob_handler(obj, handle_blob_dtmf);
+}
+
+static struct ast_json *handle_blob_userevent(struct ast_channel_blob *obj)
+{
+	return stasis_json_event_channel_userevent_create(obj->snapshot, obj->blob);
+}
+
+static void sub_userevent_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	struct ast_channel_blob *obj = stasis_message_data(message);
+	generic_blob_handler(obj, handle_blob_userevent);
+}
+
+static struct ast_json *handle_blob_hangup_request(struct ast_channel_blob *obj)
+{
+	return stasis_json_event_channel_hangup_request_create(obj->snapshot, obj->blob);
+}
+
+static void sub_hangup_request_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	struct ast_channel_blob *obj = stasis_message_data(message);
+	generic_blob_handler(obj, handle_blob_hangup_request);
+}
+
+static struct ast_json *handle_blob_varset(struct ast_channel_blob *obj)
+{
+	return stasis_json_event_channel_varset_create(obj->snapshot, obj->blob);
+}
+
+static void sub_varset_handler(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	struct ast_channel_blob *obj = stasis_message_data(message);
+	generic_blob_handler(obj, handle_blob_varset);
+}
+
 static int load_module(void)
 {
 	int r = 0;
@@ -709,12 +935,29 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	return r;
+	channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
+	if (!channel_router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
+	r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_userevent_handler, NULL);
+	r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_varset_handler, NULL);
+	r |= stasis_message_router_add(channel_router, ast_channel_dtmf_begin_type(), sub_dtmf_handler, NULL);
+	r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_hangup_request_handler, NULL);
+	if (r) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
 	int r = 0;
+
+	stasis_message_router_unsubscribe(channel_router);
+	channel_router = NULL;
 
 	ao2_cleanup(apps_registry);
 	apps_registry = NULL;
@@ -732,11 +975,17 @@
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
 {
+	RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
 	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
 	SCOPED_AO2LOCK(lock, app);
 
-	msg = event_create("application-replaced", NULL, NULL);
-	app->handler(app->data, app->name, msg);
+	blob = ast_json_pack("{s: s}", "application", app->name);
+	if (blob) {
+		msg = stasis_json_event_application_replaced_create(blob);
+		if (msg) {
+			app->handler(app->data, app->name, msg);
+		}
+	}
 
 	app->handler = handler;
 	ao2_cleanup(app->data);

Modified: team/dlee/playback/res/res_stasis_http_events.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/playback/res/res_stasis_http_events.c?view=diff&rev=388300&r1=388299&r2=388300
==============================================================================
--- team/dlee/playback/res/res_stasis_http_events.c (original)
+++ team/dlee/playback/res/res_stasis_http_events.c Fri May 10 09:09:09 2013
@@ -42,6 +42,7 @@
 
 #include "asterisk/module.h"
 #include "stasis_http/resource_events.h"
+#include "asterisk/stasis_channels.h"
 
 /*!
  * \brief Parameter parsing callback for /events.
@@ -76,6 +77,588 @@
 	.children = {  }
 };
 
+struct ast_json *stasis_json_event_playback_finished_create(
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "playback");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "playback_finished", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_channel_snapshot_create(
+	struct ast_channel_snapshot *channel_snapshot
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	int ret;
+
+	ast_assert(channel_snapshot != NULL);
+
+	event = ast_json_object_create();
+	if (!event) {
+		return NULL;
+	}
+
+	ret = ast_json_object_set(event,
+		"channel", ast_channel_snapshot_to_json(channel_snapshot));
+	if (ret) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "channel_snapshot", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_channel_caller_id_create(
+	struct ast_channel_snapshot *channel_snapshot,
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+	int ret;
+
+	ast_assert(channel_snapshot != NULL);
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "channel") == NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "caller_presentation_txt");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	validator = ast_json_object_get(blob, "caller_presentation");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	ret = ast_json_object_set(event,
+		"channel", ast_channel_snapshot_to_json(channel_snapshot));
+	if (ret) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "channel_caller_id", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_playback_started_create(
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "playback");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "playback_started", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_application_replaced_create(
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "application");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "application_replaced", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_channel_destroyed_create(
+	struct ast_channel_snapshot *channel_snapshot,
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+	int ret;
+
+	ast_assert(channel_snapshot != NULL);
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "channel") == NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "cause");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	validator = ast_json_object_get(blob, "cause_txt");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	ret = ast_json_object_set(event,
+		"channel", ast_channel_snapshot_to_json(channel_snapshot));
+	if (ret) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "channel_destroyed", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+
+	return ast_json_ref(message);
+}
+
+struct ast_json *stasis_json_event_channel_varset_create(
+	struct ast_channel_snapshot *channel_snapshot,
+	struct ast_json *blob
+	)
+{
+	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
+	RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+	struct ast_json *validator;
+	int ret;
+
+	ast_assert(channel_snapshot != NULL);
+	ast_assert(blob != NULL);
+	ast_assert(ast_json_object_get(blob, "channel") == NULL);
+	ast_assert(ast_json_object_get(blob, "type") == NULL);
+
+	validator = ast_json_object_get(blob, "variable");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	validator = ast_json_object_get(blob, "value");
+	if (validator) {
+		/* do validation? XXX */
+	} else {
+		/* fail message generation if the required parameter doesn't exist */
+		return NULL;
+	}
+
+	event = ast_json_deep_copy(blob);
+	if (!event) {
+		return NULL;
+	}
+
+	ret = ast_json_object_set(event,
+		"channel", ast_channel_snapshot_to_json(channel_snapshot));
+	if (ret) {
+		return NULL;
+	}
+
+	message = ast_json_pack("{s: o}", "channel_varset", ast_json_ref(event));
+	if (!message) {
+		return NULL;
+	}
+

[... 1301 lines stripped ...]



More information about the svn-commits mailing list