[asterisk-commits] kmoore: branch kmoore/stasis-bridge_events r385857 - in /team/kmoore/stasis-b...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Apr 16 09:57:25 CDT 2013
Author: kmoore
Date: Tue Apr 16 09:57:22 2013
New Revision: 385857
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=385857
Log:
Fix merge oops and add bridge merge message handling for res_stasis applications
Modified:
team/kmoore/stasis-bridge_events/main/stasis_bridging.c
team/kmoore/stasis-bridge_events/res/res_stasis.c
Modified: team/kmoore/stasis-bridge_events/main/stasis_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/main/stasis_bridging.c?view=diff&rev=385857&r1=385856&r2=385857
==============================================================================
--- team/kmoore/stasis-bridge_events/main/stasis_bridging.c (original)
+++ team/kmoore/stasis-bridge_events/main/stasis_bridging.c Tue Apr 16 09:57:22 2013
@@ -145,6 +145,20 @@
stasis_publish(ast_bridge_topic(bridge), msg);
}
+static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+{
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ ast_assert(obj != NULL);
+
+ msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+}
+
struct stasis_message_type *ast_bridge_merge_message_type(void)
{
return bridge_merge_message_type;
@@ -282,8 +296,6 @@
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, enter_json, NULL, ast_json_unref);
- ast_bridge_publish_state(bridge);
-
enter_json = ast_json_pack("{s: s}",
"type", "enter");
@@ -292,7 +304,9 @@
return;
}
+ /* enter blob first, then state */
stasis_publish(ast_bridge_topic(bridge), msg);
+ bridge_publish_state_from_blob(stasis_message_data(msg));
}
void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -300,7 +314,6 @@
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, leave_json, NULL, ast_json_unref);
- ast_bridge_publish_state(bridge);
leave_json = ast_json_pack("{s: s}",
"type", "leave");
@@ -310,6 +323,8 @@
return;
}
+ /* state first, then leave blob (opposite of enter, preserves nesting of events) */
+ bridge_publish_state_from_blob(stasis_message_data(msg));
stasis_publish(ast_bridge_topic(bridge), msg);
}
Modified: team/kmoore/stasis-bridge_events/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/res/res_stasis.c?view=diff&rev=385857&r1=385856&r2=385857
==============================================================================
--- team/kmoore/stasis-bridge_events/res/res_stasis.c (original)
+++ team/kmoore/stasis-bridge_events/res/res_stasis.c Tue Apr 16 09:57:22 2013
@@ -1004,17 +1004,11 @@
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- RAII_VAR(char *, entering_chan, NULL, ao2_cleanup);
- RAII_VAR(char *, leaving_chan, NULL, ao2_cleanup);
RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
char *bridge_uniqueid = ast_strdupa(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
int i;
- /* update bridge_info for added channels if there is interest */
- if (new_snapshot && old_snapshot) {
- entering_chan = ao2_callback(new_snapshot->channels, 0, find_diff, old_snapshot->channels);
- }
- info = bridge_info_create_or_update(bridge_uniqueid, entering_chan);
+ info = bridge_info_create_or_update(bridge_uniqueid, NULL);
if (!info) {
/* no interest in this bridge */
return;
@@ -1023,7 +1017,7 @@
/* get list of apps that are interested in this bridge event */
watching_apps = get_bridge_watching_apps(info);
if (!watching_apps) {
- goto bridge_handler_cleanup;
+ return;
}
for (i = 0; i < ARRAY_LEN(bridge_monitors); ++i) {
@@ -1034,13 +1028,82 @@
ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
}
-
-bridge_handler_cleanup:
- /* update bridge_info for removed channels if there is interest */
- if (new_snapshot && old_snapshot) {
- leaving_chan = ao2_callback(old_snapshot->channels, 0, find_diff, new_snapshot->channels);
- ao2_find(info->channels, leaving_chan, OBJ_UNLINK | OBJ_NODATA);
- }
+}
+
+static int list_merge_cb(void *obj, void *arg, int flags)
+{
+ /* remove any current entries for this app */
+ ao2_find(arg, obj, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
+ /* relink as the only entry */
+ ao2_link(arg, obj);
+ return 0;
+}
+
+static struct ao2_container *get_watching_merge(struct ast_bridge_merge_message *merge)
+{
+ RAII_VAR(struct ao2_container *, watching_to, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, watching_from, NULL, ao2_cleanup);
+ RAII_VAR(struct bridge_info *, info_to, NULL, ao2_cleanup);
+ RAII_VAR(struct bridge_info *, info_from, NULL, ao2_cleanup);
+ char *to_uniqueid = ast_strdupa(merge->to->uniqueid);
+ char *from_uniqueid = ast_strdupa(merge->from->uniqueid);
+ struct ao2_container *watching_combined;
+
+ info_to = bridge_info_create_or_update(to_uniqueid, NULL);
+ info_from = bridge_info_create_or_update(from_uniqueid, NULL);
+ if (!info_to && !info_from) {
+ /* no interest in this merge */
+ return NULL;
+ }
+
+ if (info_to) {
+ watching_to = get_bridge_watching_apps(info_to);
+ if (!watching_to) {
+ return NULL;
+ }
+ }
+ if (info_from) {
+ watching_from = get_bridge_watching_apps(info_from);
+ if (!watching_from) {
+ return NULL;
+ }
+ }
+
+ /* handle the merge of the lists */
+ if (info_to && !info_from) {
+ watching_combined = watching_to;
+ } else if (!info_to && info_from) {
+ watching_combined = watching_from;
+ } else {
+ /* apps are watching both bridges */
+ watching_combined = watching_to;
+ ao2_callback(watching_from, OBJ_NODATA, list_merge_cb, watching_to);
+ }
+
+ ao2_ref(watching_combined, +1);
+ return watching_combined;
+}
+
+static void bridge_merge_cb(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ struct ast_bridge_merge_message *merge = stasis_message_data(message);
+
+ watching_apps = get_watching_merge(merge);
+
+ msg = ast_json_pack("{s: {s: o, s: o}}",
+ "bridge-event-merge",
+ "to", ast_bridge_snapshot_to_json(merge->to),
+ "from", ast_bridge_snapshot_to_json(merge->from));
+ if (!msg) {
+ return;
+ }
+
+ ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
/*!
@@ -1292,8 +1355,8 @@
return AST_MODULE_LOAD_FAILURE;
}
- ret |= stasis_message_router_add(app_channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
- ret |= stasis_message_router_add(app_channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
+ ret |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
+ ret |= stasis_message_router_add(channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
if (ret) {
return AST_MODULE_LOAD_FAILURE;
}
@@ -1313,6 +1376,11 @@
stasis_cache_update_type(),
bridge_state_cb,
NULL);
+ ret |= stasis_message_router_add(
+ bridge_router,
+ ast_bridge_merge_message_type(),
+ bridge_merge_cb,
+ NULL);
if (ret) {
return AST_MODULE_LOAD_FAILURE;
}
More information about the asterisk-commits
mailing list