[asterisk-commits] kmoore: branch kmoore/stasis-bridge_events r385857 - in /team/kmoore/stasis-b...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Apr 16 09:57:25 CDT 2013


Author: kmoore
Date: Tue Apr 16 09:57:22 2013
New Revision: 385857

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=385857
Log:
Fix merge oops and add bridge merge message handling for res_stasis applications

Modified:
    team/kmoore/stasis-bridge_events/main/stasis_bridging.c
    team/kmoore/stasis-bridge_events/res/res_stasis.c

Modified: team/kmoore/stasis-bridge_events/main/stasis_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/main/stasis_bridging.c?view=diff&rev=385857&r1=385856&r2=385857
==============================================================================
--- team/kmoore/stasis-bridge_events/main/stasis_bridging.c (original)
+++ team/kmoore/stasis-bridge_events/main/stasis_bridging.c Tue Apr 16 09:57:22 2013
@@ -145,6 +145,20 @@
 	stasis_publish(ast_bridge_topic(bridge), msg);
 }
 
+static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+{
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+	ast_assert(obj != NULL);
+
+	msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
+	if (!msg) {
+		return;
+	}
+
+	stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+}
+
 struct stasis_message_type *ast_bridge_merge_message_type(void)
 {
 	return bridge_merge_message_type;
@@ -282,8 +296,6 @@
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_json *, enter_json, NULL, ast_json_unref);
 
-	ast_bridge_publish_state(bridge);
-
 	enter_json = ast_json_pack("{s: s}",
 			"type", "enter");
 
@@ -292,7 +304,9 @@
 		return;
 	}
 
+	/* enter blob first, then state */
 	stasis_publish(ast_bridge_topic(bridge), msg);
+	bridge_publish_state_from_blob(stasis_message_data(msg));
 }
 
 void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -300,7 +314,6 @@
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_json *, leave_json, NULL, ast_json_unref);
 
-	ast_bridge_publish_state(bridge);
 
 	leave_json = ast_json_pack("{s: s}",
 			"type", "leave");
@@ -310,6 +323,8 @@
 		return;
 	}
 
+	/* state first, then leave blob (opposite of enter, preserves nesting of events) */
+	bridge_publish_state_from_blob(stasis_message_data(msg));
 	stasis_publish(ast_bridge_topic(bridge), msg);
 }
 

Modified: team/kmoore/stasis-bridge_events/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/res/res_stasis.c?view=diff&rev=385857&r1=385856&r2=385857
==============================================================================
--- team/kmoore/stasis-bridge_events/res/res_stasis.c (original)
+++ team/kmoore/stasis-bridge_events/res/res_stasis.c Tue Apr 16 09:57:22 2013
@@ -1004,17 +1004,11 @@
 	struct stasis_cache_update *update = stasis_message_data(message);
 	struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
 	struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
-	RAII_VAR(char *, entering_chan, NULL, ao2_cleanup);
-	RAII_VAR(char *, leaving_chan, NULL, ao2_cleanup);
 	RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
 	char *bridge_uniqueid = ast_strdupa(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
 	int i;
 
-	/* update bridge_info for added channels if there is interest */ 
-	if (new_snapshot && old_snapshot) {
-		entering_chan = ao2_callback(new_snapshot->channels, 0, find_diff, old_snapshot->channels);
-	}
-	info = bridge_info_create_or_update(bridge_uniqueid, entering_chan);
+	info = bridge_info_create_or_update(bridge_uniqueid, NULL);
 	if (!info) {
 		/* no interest in this bridge */
 		return;
@@ -1023,7 +1017,7 @@
 	/* get list of apps that are interested in this bridge event */
 	watching_apps = get_bridge_watching_apps(info);
 	if (!watching_apps) {
-		goto bridge_handler_cleanup;
+		return;
 	}
 
 	for (i = 0; i < ARRAY_LEN(bridge_monitors); ++i) {
@@ -1034,13 +1028,82 @@
 			ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
 		}
 	}
-
-bridge_handler_cleanup:
-	/* update bridge_info for removed channels if there is interest */ 
-	if (new_snapshot && old_snapshot) {
-		leaving_chan = ao2_callback(old_snapshot->channels, 0, find_diff, new_snapshot->channels);
-		ao2_find(info->channels, leaving_chan, OBJ_UNLINK | OBJ_NODATA);
-	}
+}
+
+static int list_merge_cb(void *obj, void *arg, int flags)
+{
+	/* remove any current entries for this app */
+	ao2_find(arg, obj, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
+	/* relink as the only entry */
+	ao2_link(arg, obj);
+	return 0;
+}
+
+static struct ao2_container *get_watching_merge(struct ast_bridge_merge_message *merge)
+{
+	RAII_VAR(struct ao2_container *, watching_to, NULL, ao2_cleanup);
+	RAII_VAR(struct ao2_container *, watching_from, NULL, ao2_cleanup);
+	RAII_VAR(struct bridge_info *, info_to, NULL, ao2_cleanup);
+	RAII_VAR(struct bridge_info *, info_from, NULL, ao2_cleanup);
+	char *to_uniqueid = ast_strdupa(merge->to->uniqueid);
+	char *from_uniqueid = ast_strdupa(merge->from->uniqueid);
+	struct ao2_container *watching_combined;
+
+	info_to = bridge_info_create_or_update(to_uniqueid, NULL);
+	info_from = bridge_info_create_or_update(from_uniqueid, NULL);
+	if (!info_to && !info_from) {
+		/* no interest in this merge */
+		return NULL;
+	}
+
+	if (info_to) {
+		watching_to = get_bridge_watching_apps(info_to);
+		if (!watching_to) {
+			return NULL;
+		}
+	}
+	if (info_from) {
+		watching_from = get_bridge_watching_apps(info_from);
+		if (!watching_from) {
+			return NULL;
+		}
+	}
+
+	/* handle the merge of the lists */
+	if (info_to && !info_from) {
+		watching_combined = watching_to;
+	} else if (!info_to && info_from) {
+		watching_combined = watching_from;
+	} else {
+		/* apps are watching both bridges */
+		watching_combined = watching_to;
+		ao2_callback(watching_from, OBJ_NODATA, list_merge_cb, watching_to);
+	}
+
+	ao2_ref(watching_combined, +1);
+	return watching_combined;
+}
+
+static void bridge_merge_cb(void *data,
+		struct stasis_subscription *sub,
+		struct stasis_topic *topic,
+		struct stasis_message *message)
+{
+	RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+	struct ast_bridge_merge_message *merge = stasis_message_data(message);
+
+	watching_apps = get_watching_merge(merge);
+
+	msg = ast_json_pack("{s: {s: o, s: o}}",
+		"bridge-event-merge",
+			"to", ast_bridge_snapshot_to_json(merge->to),
+			"from", ast_bridge_snapshot_to_json(merge->from));
+	if (!msg) {
+		return;
+	}
+
+	ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
 }
 
 /*!
@@ -1292,8 +1355,8 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	ret |= stasis_message_router_add(app_channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
-	ret |= stasis_message_router_add(app_channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
+	ret |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
+	ret |= stasis_message_router_add(channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
 	if (ret) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
@@ -1313,6 +1376,11 @@
 		stasis_cache_update_type(),
 		bridge_state_cb,
 		NULL);
+	ret |= stasis_message_router_add(
+		bridge_router,
+		ast_bridge_merge_message_type(),
+		bridge_merge_cb,
+		NULL);
 	if (ret) {
 		return AST_MODULE_LOAD_FAILURE;
 	}




More information about the asterisk-commits mailing list