[asterisk-commits] kmoore: branch kmoore/stasis-bridge_events r385913 - /team/kmoore/stasis-brid...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Apr 16 15:33:19 CDT 2013


Author: kmoore
Date: Tue Apr 16 15:33:17 2013
New Revision: 385913

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=385913
Log:
Rework channel/bridge enter/leave events to use bridge blob messages instead of recreating the information

Modified:
    team/kmoore/stasis-bridge_events/res/res_stasis.c

Modified: team/kmoore/stasis-bridge_events/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/res/res_stasis.c?view=diff&rev=385913&r1=385912&r2=385913
==============================================================================
--- team/kmoore/stasis-bridge_events/res/res_stasis.c (original)
+++ team/kmoore/stasis-bridge_events/res/res_stasis.c Tue Apr 16 15:33:17 2013
@@ -62,7 +62,7 @@
 #define APP_CHANNELS_BUCKETS 7
 
 /*!
- * \brief Number of buckets for the blob_handlers container.  Remember to keep
+ * \brief Number of buckets for the *_blob_handlers containers.  Remember to keep
  * it a prime number!
  */
 #define BLOB_HANDLER_BUCKETS 7
@@ -79,7 +79,10 @@
 struct ao2_container *__app_controls;
 
 /*! \brief Container for handlers for channel blob messages */
-struct ao2_container *blob_handlers;
+struct ao2_container *channel_blob_handlers;
+
+/*! \brief Container for handlers for bridge blob messages */
+struct ao2_container *bridge_blob_handlers;
 
 /*! \brief Message router for the channel caching topic */
 struct stasis_message_router *channel_router;
@@ -415,12 +418,15 @@
 }
 
 /*! \brief Typedef for blob handler callbacks */
-typedef struct ast_json *(*blob_handler_cb)(struct ast_channel_blob *);
+typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
+
+/*! \brief Typedef for blob handler callbacks */
+typedef struct ast_json *(*bridge_blob_handler_cb)(struct ast_bridge_blob *);
 
 /*! \brief AO2 refcounted object linking channel blob json type to its handler callback */
 struct blob_handler {
 	char *type;
-	blob_handler_cb handler;
+	void *handler;
 };
 
 static void blob_handler_dtor(void *obj)
@@ -722,6 +728,11 @@
 	return 0;
 }
 
+static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
+{
+	ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
+}
+
 static void sub_snapshot_handler(void *data,
 		struct stasis_subscription *sub,
 		struct stasis_topic *topic,
@@ -743,27 +754,39 @@
 
 		msg = channel_monitors[i](old_snapshot, new_snapshot);
 		if (msg) {
-			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+			distribute_message(watching_apps, msg);
 		}
 	}
 }
 
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
-	ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static struct ast_json *handle_blob(struct ast_channel_blob *obj)
+static struct ast_json *handle_channel_blob(struct ast_channel_blob *obj)
 {
 	RAII_VAR(struct blob_handler *, handler, NULL, ao2_cleanup);
 	const char *handler_type = ast_channel_blob_json_type(obj);
-
-	handler = ao2_find(blob_handlers, handler_type, OBJ_KEY);
+	channel_blob_handler_cb handler_cb;
+
+	handler = ao2_find(channel_blob_handlers, handler_type, OBJ_KEY);
 	if (!handler) {
 		return NULL;
 	}
 
-	return handler->handler(obj);
+	handler_cb = handler->handler;
+	return handler_cb(obj);
+}
+
+static struct ast_json *handle_bridge_blob(struct ast_bridge_blob *obj)
+{
+	RAII_VAR(struct blob_handler *, handler, NULL, ao2_cleanup);
+	const char *handler_type = ast_bridge_blob_json_type(obj);
+	bridge_blob_handler_cb handler_cb;
+
+	handler = ao2_find(bridge_blob_handlers, handler_type, OBJ_KEY);
+	if (!handler) {
+		return NULL;
+	}
+
+	handler_cb = handler->handler;
+	return handler_cb(obj);
 }
 
 static void sub_blob_handler(void *data,
@@ -784,7 +807,7 @@
 		return;
 	}
 
-	msg = handle_blob(obj);
+	msg = handle_channel_blob(obj);
 	if (!msg) {
 		return;
 	}
@@ -792,18 +815,6 @@
 	distribute_message(watching_apps, msg);
 }
 
-static int find_diff(void *uniqueid, void *arg, int flags)
-{
-	struct ao2_container *secondary = arg;
-	RAII_VAR(char *, ao2_uniqueid, NULL, ao2_cleanup);
-
-	ao2_uniqueid = ao2_find(secondary, uniqueid, OBJ_KEY);
-	if (!ao2_uniqueid) {
-		return CMP_MATCH | CMP_STOP;
-	}
-
-	return 0;
-}
 static struct bridge_info *bridge_info_create_or_update(char *bridge_uniqueid, char *entering_chan)
 {
 	RAII_VAR(struct ao2_container *, entering_apps, NULL, ao2_cleanup);
@@ -866,11 +877,19 @@
 
 /* find all apps where the intersection of the app channels of interest
  * set and the bridge stasis channels set results in a non-empty set */
-static struct ao2_container *get_bridge_watching_apps(struct bridge_info *info)
+static struct ao2_container *get_bridge_watching_apps(struct ast_bridge_snapshot *bridge)
 {
 	RAII_VAR(struct ao2_container *, out, NULL, ao2_cleanup);
 	RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
-	RAII_VAR(struct ao2_iterator *,callback_iter, NULL, ao2_iterator_destroy);
+	RAII_VAR(struct ao2_iterator *, callback_iter, NULL, ao2_iterator_destroy);
+	RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
+	char *bridge_uniqueid = ast_strdupa(bridge->uniqueid);
+
+	info = bridge_info_create_or_update(bridge_uniqueid, NULL);
+	if (!info) {
+		/* no interest in this bridge */
+		return NULL;
+	}
 
 	callback_iter = ao2_callback(apps, OBJ_MULTIPLE, check_app_intersect_cb, info->channels);
 	out = callback_iter->c;
@@ -886,6 +905,7 @@
 static struct ast_json *app_bridge_event_create(
 	const char *event_name,
 	const struct ast_bridge_snapshot *snapshot,
+	const struct ast_channel_snapshot *chan_snapshot,
 	const struct ast_json *extra_info)
 {
 	RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
@@ -911,6 +931,20 @@
 		}
 	}
 
+	if (chan_snapshot) {
+		int ret;
+
+		/* Mustn't already have a channel field */
+		ast_assert(ast_json_object_get(event, "channel") == NULL);
+
+		ret = ast_json_object_set(
+			event,
+			"channel", ast_channel_snapshot_to_json(chan_snapshot));
+		if (ret != 0) {
+			return NULL;
+		}
+	}
+
 	message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
 
 	return ast_json_ref(message);
@@ -929,7 +963,7 @@
 		return NULL;
 	}
 
-	return app_bridge_event_create("bridge-snapshot", new_snapshot, NULL);
+	return app_bridge_event_create("bridge-snapshot", new_snapshot, NULL, NULL);
 }
 
 /*! \brief Handle bridge creation */
@@ -941,7 +975,7 @@
 		return NULL;
 	}
 
-	return app_bridge_event_create("bridge-event-create", new_snapshot, NULL);
+	return app_bridge_event_create("bridge-event-create", new_snapshot, NULL, NULL);
 }
 
 /*! \brief Handle bridge destruction */
@@ -953,46 +987,13 @@
 		return NULL;
 	}
 
-	return app_bridge_event_create("bridge-event-destroy", old_snapshot, NULL);
-}
-
-/*! \brief Handle channels entering and leaving */
-static struct ast_json *bridge_channel_diff(
-	struct ast_bridge_snapshot *old_snapshot,
-	struct ast_bridge_snapshot *new_snapshot)
-{
-	RAII_VAR(char *, new_diff, NULL, ao2_cleanup);
-	RAII_VAR(char *, old_diff, NULL, ao2_cleanup);
-	RAII_VAR(struct ast_json *, uniqueid_json, NULL, ast_json_unref);
-	char *event;
-
-	if (!new_snapshot || !old_snapshot
-		|| ao2_container_count(new_snapshot->channels)
-			== ao2_container_count(old_snapshot->channels)) {
-		return NULL;
-	}
-
-	/* get the diff channel for new and old snapshot */
-	new_diff = ao2_callback(new_snapshot->channels, OBJ_NODATA, find_diff, old_snapshot->channels);
-	old_diff = ao2_callback(old_snapshot->channels, OBJ_NODATA, find_diff, new_snapshot->channels);
-	if (new_diff) {
-		event = "bridge-event-enter";
-		uniqueid_json = ast_json_pack("s: s", "uniqueid", new_diff);
-	} else if (old_diff) {
-		event = "bridge-event-leave";
-		uniqueid_json = ast_json_pack("s: s", "uniqueid", old_diff);
-	} else {
-		return NULL;
-	}
-
-	return app_bridge_event_create(event, old_snapshot, uniqueid_json);
+	return app_bridge_event_create("bridge-event-destroy", old_snapshot, NULL, NULL);
 }
 
 bridge_snapshot_monitor bridge_monitors[] = {
 	bridge_snapshot,
 	bridge_create,
 	bridge_destroy,
-	bridge_channel_diff,
 };
 
 static void bridge_state_cb(void *data,
@@ -1004,18 +1005,10 @@
 	struct stasis_cache_update *update = stasis_message_data(message);
 	struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
 	struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
-	char *bridge_uniqueid = ast_strdupa(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
 	int i;
 
-	info = bridge_info_create_or_update(bridge_uniqueid, NULL);
-	if (!info) {
-		/* no interest in this bridge */
-		return;
-	}
-
 	/* get list of apps that are interested in this bridge event */
-	watching_apps = get_bridge_watching_apps(info);
+	watching_apps = get_bridge_watching_apps(new_snapshot ? new_snapshot : old_snapshot);
 	if (!watching_apps) {
 		return;
 	}
@@ -1025,9 +1018,44 @@
 
 		msg = bridge_monitors[i](old_snapshot, new_snapshot);
 		if (msg) {
-			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+			distribute_message(watching_apps, msg);
 		}
 	}
+}
+
+/*! \brief Handle bridge blobs */
+static void bridge_blob_cb(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+	struct ast_bridge_blob *blob = stasis_message_data(message);
+	const char *type = ast_bridge_blob_json_type(blob);
+
+	/* if this is a channel entry, delay watching app list generation */
+	if (strcmp("enter", type)) {
+		watching_apps = get_bridge_watching_apps(blob->bridge);
+		if (!watching_apps) {
+			return;
+		}
+	}
+
+	msg = handle_bridge_blob(blob);
+	if (!msg) {
+		return;
+	}
+
+	/* handle watching_apps list generation now that interested apps have been updated */
+	if (!watching_apps) {
+		watching_apps = get_bridge_watching_apps(blob->bridge);
+		if (!watching_apps) {
+			return;
+		}
+	}
+
+	distribute_message(watching_apps, msg);
 }
 
 static int list_merge_cb(void *obj, void *arg, int flags)
@@ -1043,36 +1071,15 @@
 {
 	RAII_VAR(struct ao2_container *, watching_to, NULL, ao2_cleanup);
 	RAII_VAR(struct ao2_container *, watching_from, NULL, ao2_cleanup);
-	RAII_VAR(struct bridge_info *, info_to, NULL, ao2_cleanup);
-	RAII_VAR(struct bridge_info *, info_from, NULL, ao2_cleanup);
-	char *to_uniqueid = ast_strdupa(merge->to->uniqueid);
-	char *from_uniqueid = ast_strdupa(merge->from->uniqueid);
 	struct ao2_container *watching_combined;
 
-	info_to = bridge_info_create_or_update(to_uniqueid, NULL);
-	info_from = bridge_info_create_or_update(from_uniqueid, NULL);
-	if (!info_to && !info_from) {
-		/* no interest in this merge */
-		return NULL;
-	}
-
-	if (info_to) {
-		watching_to = get_bridge_watching_apps(info_to);
-		if (!watching_to) {
-			return NULL;
-		}
-	}
-	if (info_from) {
-		watching_from = get_bridge_watching_apps(info_from);
-		if (!watching_from) {
-			return NULL;
-		}
-	}
+	watching_to = get_bridge_watching_apps(merge->to);
+	watching_from = get_bridge_watching_apps(merge->from);
 
 	/* handle the merge of the lists */
-	if (info_to && !info_from) {
+	if (watching_to && !watching_from) {
 		watching_combined = watching_to;
-	} else if (!info_to && info_from) {
+	} else if (!watching_to && watching_from) {
 		watching_combined = watching_from;
 	} else {
 		/* apps are watching both bridges */
@@ -1103,7 +1110,7 @@
 		return;
 	}
 
-	ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+	distribute_message(watching_apps, msg);
 }
 
 /*!
@@ -1285,6 +1292,23 @@
 	return app_channel_event_create("channel-event-dtmf-received", obj->snapshot, extra);
 }
 
+static struct ast_json *handle_blob_enter(struct ast_bridge_blob *obj)
+{
+	RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
+	char *bridge_uniqueid = ast_strdupa(obj->bridge->uniqueid);
+	char *channel_uniqueid = ast_strdupa(obj->channel->uniqueid);
+
+	/* force the bridge-of-interest channel-of-interest listing to update for the new channel */
+	info = bridge_info_create_or_update(bridge_uniqueid, channel_uniqueid);
+
+	return app_bridge_event_create("bridge-event-enter", obj->bridge, obj->channel, NULL);
+}
+
+static struct ast_json *handle_blob_leave(struct ast_bridge_blob *obj)
+{
+	return app_bridge_event_create("bridge-event-leave", obj->bridge, obj->channel, NULL);
+}
+
 static struct ast_json *handle_blob_generic(struct ast_channel_blob *obj)
 {
 	RAII_VAR(struct ast_str *, event_name, ast_str_create(32), ast_free);
@@ -1294,7 +1318,7 @@
 	return app_channel_event_create(ast_str_buffer(event_name), obj->snapshot, obj->blob);
 }
 
-static void register_blob_handler(const char *blob_type, blob_handler_cb blob_type_handler_cb)
+static void register_blob_handler(struct ao2_container *handlers, const char *blob_type, void *handler_cb)
 {
 	RAII_VAR(struct blob_handler *, handler, ao2_alloc(sizeof(*handler), blob_handler_dtor), ao2_cleanup);
 
@@ -1302,13 +1326,13 @@
 		return;
 	}
 
-	handler->handler = blob_type_handler_cb;
+	handler->handler = handler_cb;
 	handler->type = ast_strdup(blob_type);
 	if (!handler->type) {
 		return;
 	}
 
-	ao2_link(blob_handlers, handler);
+	ao2_link(handlers, handler);
 }
 
 static int blob_handler_hash(const void *obj, const int flags)
@@ -1340,15 +1364,15 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
-	if (!blob_handlers) {
+	channel_blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
+	if (!channel_blob_handlers) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	register_blob_handler("userevent", handle_blob_generic);
-	register_blob_handler("hangup_request", handle_blob_generic);
-	register_blob_handler("varset", handle_blob_generic);
-	register_blob_handler("dtmf_end", handle_blob_dtmf);
+	register_blob_handler(channel_blob_handlers, "userevent", handle_blob_generic);
+	register_blob_handler(channel_blob_handlers, "hangup_request", handle_blob_generic);
+	register_blob_handler(channel_blob_handlers, "varset", handle_blob_generic);
+	register_blob_handler(channel_blob_handlers, "dtmf_end", handle_blob_dtmf);
 
 	channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
 	if (!channel_router) {
@@ -1366,6 +1390,14 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
+	bridge_blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
+	if (!bridge_blob_handlers) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+
+	register_blob_handler(bridge_blob_handlers, "enter", handle_blob_enter);
+	register_blob_handler(bridge_blob_handlers, "leave", handle_blob_leave);
+
 	bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached()));
 	if (!bridge_router) {
 		return AST_MODULE_LOAD_FAILURE;
@@ -1375,6 +1407,11 @@
 		bridge_router,
 		stasis_cache_update_type(),
 		bridge_state_cb,
+		NULL);
+	ret |= stasis_message_router_add(
+		bridge_router,
+		ast_bridge_blob_type(),
+		bridge_blob_cb,
 		NULL);
 	ret |= stasis_message_router_add(
 		bridge_router,
@@ -1395,8 +1432,11 @@
 	stasis_message_router_unsubscribe(channel_router);
 	channel_router = NULL;
 
-	ao2_cleanup(blob_handlers);
-	blob_handlers = NULL;
+	ao2_cleanup(channel_blob_handlers);
+	channel_blob_handlers = NULL;
+
+	ao2_cleanup(bridge_blob_handlers);
+	bridge_blob_handlers = NULL;
 
 	stasis_message_router_unsubscribe(bridge_router);
 	bridge_router = NULL;




More information about the asterisk-commits mailing list