[asterisk-commits] kmoore: branch kmoore/stasis-bridge_events r385913 - /team/kmoore/stasis-brid...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Apr 16 15:33:19 CDT 2013
Author: kmoore
Date: Tue Apr 16 15:33:17 2013
New Revision: 385913
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=385913
Log:
Rework channel/bridge enter/leave events to use bridge blob messages instead of recreating the information
Modified:
team/kmoore/stasis-bridge_events/res/res_stasis.c
Modified: team/kmoore/stasis-bridge_events/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridge_events/res/res_stasis.c?view=diff&rev=385913&r1=385912&r2=385913
==============================================================================
--- team/kmoore/stasis-bridge_events/res/res_stasis.c (original)
+++ team/kmoore/stasis-bridge_events/res/res_stasis.c Tue Apr 16 15:33:17 2013
@@ -62,7 +62,7 @@
#define APP_CHANNELS_BUCKETS 7
/*!
- * \brief Number of buckets for the blob_handlers container. Remember to keep
+ * \brief Number of buckets for the *_blob_handlers containers. Remember to keep
* it a prime number!
*/
#define BLOB_HANDLER_BUCKETS 7
@@ -79,7 +79,10 @@
struct ao2_container *__app_controls;
/*! \brief Container for handlers for channel blob messages */
-struct ao2_container *blob_handlers;
+struct ao2_container *channel_blob_handlers;
+
+/*! \brief Container for handlers for bridge blob messages */
+struct ao2_container *bridge_blob_handlers;
/*! \brief Message router for the channel caching topic */
struct stasis_message_router *channel_router;
@@ -415,12 +418,15 @@
}
/*! \brief Typedef for blob handler callbacks */
-typedef struct ast_json *(*blob_handler_cb)(struct ast_channel_blob *);
+typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
+
+/*! \brief Typedef for blob handler callbacks */
+typedef struct ast_json *(*bridge_blob_handler_cb)(struct ast_bridge_blob *);
/*! \brief AO2 refcounted object linking channel blob json type to its handler callback */
struct blob_handler {
char *type;
- blob_handler_cb handler;
+ void *handler;
};
static void blob_handler_dtor(void *obj)
@@ -722,6 +728,11 @@
return 0;
}
+static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
+{
+ ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
+}
+
static void sub_snapshot_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
@@ -743,27 +754,39 @@
msg = channel_monitors[i](old_snapshot, new_snapshot);
if (msg) {
- ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+ distribute_message(watching_apps, msg);
}
}
}
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
- ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static struct ast_json *handle_blob(struct ast_channel_blob *obj)
+static struct ast_json *handle_channel_blob(struct ast_channel_blob *obj)
{
RAII_VAR(struct blob_handler *, handler, NULL, ao2_cleanup);
const char *handler_type = ast_channel_blob_json_type(obj);
-
- handler = ao2_find(blob_handlers, handler_type, OBJ_KEY);
+ channel_blob_handler_cb handler_cb;
+
+ handler = ao2_find(channel_blob_handlers, handler_type, OBJ_KEY);
if (!handler) {
return NULL;
}
- return handler->handler(obj);
+ handler_cb = handler->handler;
+ return handler_cb(obj);
+}
+
+static struct ast_json *handle_bridge_blob(struct ast_bridge_blob *obj)
+{
+ RAII_VAR(struct blob_handler *, handler, NULL, ao2_cleanup);
+ const char *handler_type = ast_bridge_blob_json_type(obj);
+ bridge_blob_handler_cb handler_cb;
+
+ handler = ao2_find(bridge_blob_handlers, handler_type, OBJ_KEY);
+ if (!handler) {
+ return NULL;
+ }
+
+ handler_cb = handler->handler;
+ return handler_cb(obj);
}
static void sub_blob_handler(void *data,
@@ -784,7 +807,7 @@
return;
}
- msg = handle_blob(obj);
+ msg = handle_channel_blob(obj);
if (!msg) {
return;
}
@@ -792,18 +815,6 @@
distribute_message(watching_apps, msg);
}
-static int find_diff(void *uniqueid, void *arg, int flags)
-{
- struct ao2_container *secondary = arg;
- RAII_VAR(char *, ao2_uniqueid, NULL, ao2_cleanup);
-
- ao2_uniqueid = ao2_find(secondary, uniqueid, OBJ_KEY);
- if (!ao2_uniqueid) {
- return CMP_MATCH | CMP_STOP;
- }
-
- return 0;
-}
static struct bridge_info *bridge_info_create_or_update(char *bridge_uniqueid, char *entering_chan)
{
RAII_VAR(struct ao2_container *, entering_apps, NULL, ao2_cleanup);
@@ -866,11 +877,19 @@
/* find all apps where the intersection of the app channels of interest
* set and the bridge stasis channels set results in a non-empty set */
-static struct ao2_container *get_bridge_watching_apps(struct bridge_info *info)
+static struct ao2_container *get_bridge_watching_apps(struct ast_bridge_snapshot *bridge)
{
RAII_VAR(struct ao2_container *, out, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
- RAII_VAR(struct ao2_iterator *,callback_iter, NULL, ao2_iterator_destroy);
+ RAII_VAR(struct ao2_iterator *, callback_iter, NULL, ao2_iterator_destroy);
+ RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
+ char *bridge_uniqueid = ast_strdupa(bridge->uniqueid);
+
+ info = bridge_info_create_or_update(bridge_uniqueid, NULL);
+ if (!info) {
+ /* no interest in this bridge */
+ return NULL;
+ }
callback_iter = ao2_callback(apps, OBJ_MULTIPLE, check_app_intersect_cb, info->channels);
out = callback_iter->c;
@@ -886,6 +905,7 @@
static struct ast_json *app_bridge_event_create(
const char *event_name,
const struct ast_bridge_snapshot *snapshot,
+ const struct ast_channel_snapshot *chan_snapshot,
const struct ast_json *extra_info)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
@@ -911,6 +931,20 @@
}
}
+ if (chan_snapshot) {
+ int ret;
+
+ /* Mustn't already have a channel field */
+ ast_assert(ast_json_object_get(event, "channel") == NULL);
+
+ ret = ast_json_object_set(
+ event,
+ "channel", ast_channel_snapshot_to_json(chan_snapshot));
+ if (ret != 0) {
+ return NULL;
+ }
+ }
+
message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
return ast_json_ref(message);
@@ -929,7 +963,7 @@
return NULL;
}
- return app_bridge_event_create("bridge-snapshot", new_snapshot, NULL);
+ return app_bridge_event_create("bridge-snapshot", new_snapshot, NULL, NULL);
}
/*! \brief Handle bridge creation */
@@ -941,7 +975,7 @@
return NULL;
}
- return app_bridge_event_create("bridge-event-create", new_snapshot, NULL);
+ return app_bridge_event_create("bridge-event-create", new_snapshot, NULL, NULL);
}
/*! \brief Handle bridge destruction */
@@ -953,46 +987,13 @@
return NULL;
}
- return app_bridge_event_create("bridge-event-destroy", old_snapshot, NULL);
-}
-
-/*! \brief Handle channels entering and leaving */
-static struct ast_json *bridge_channel_diff(
- struct ast_bridge_snapshot *old_snapshot,
- struct ast_bridge_snapshot *new_snapshot)
-{
- RAII_VAR(char *, new_diff, NULL, ao2_cleanup);
- RAII_VAR(char *, old_diff, NULL, ao2_cleanup);
- RAII_VAR(struct ast_json *, uniqueid_json, NULL, ast_json_unref);
- char *event;
-
- if (!new_snapshot || !old_snapshot
- || ao2_container_count(new_snapshot->channels)
- == ao2_container_count(old_snapshot->channels)) {
- return NULL;
- }
-
- /* get the diff channel for new and old snapshot */
- new_diff = ao2_callback(new_snapshot->channels, OBJ_NODATA, find_diff, old_snapshot->channels);
- old_diff = ao2_callback(old_snapshot->channels, OBJ_NODATA, find_diff, new_snapshot->channels);
- if (new_diff) {
- event = "bridge-event-enter";
- uniqueid_json = ast_json_pack("s: s", "uniqueid", new_diff);
- } else if (old_diff) {
- event = "bridge-event-leave";
- uniqueid_json = ast_json_pack("s: s", "uniqueid", old_diff);
- } else {
- return NULL;
- }
-
- return app_bridge_event_create(event, old_snapshot, uniqueid_json);
+ return app_bridge_event_create("bridge-event-destroy", old_snapshot, NULL, NULL);
}
bridge_snapshot_monitor bridge_monitors[] = {
bridge_snapshot,
bridge_create,
bridge_destroy,
- bridge_channel_diff,
};
static void bridge_state_cb(void *data,
@@ -1004,18 +1005,10 @@
struct stasis_cache_update *update = stasis_message_data(message);
struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
- char *bridge_uniqueid = ast_strdupa(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
int i;
- info = bridge_info_create_or_update(bridge_uniqueid, NULL);
- if (!info) {
- /* no interest in this bridge */
- return;
- }
-
/* get list of apps that are interested in this bridge event */
- watching_apps = get_bridge_watching_apps(info);
+ watching_apps = get_bridge_watching_apps(new_snapshot ? new_snapshot : old_snapshot);
if (!watching_apps) {
return;
}
@@ -1025,9 +1018,44 @@
msg = bridge_monitors[i](old_snapshot, new_snapshot);
if (msg) {
- ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+ distribute_message(watching_apps, msg);
}
}
+}
+
+/*! \brief Handle bridge blobs */
+static void bridge_blob_cb(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ struct ast_bridge_blob *blob = stasis_message_data(message);
+ const char *type = ast_bridge_blob_json_type(blob);
+
+ /* if this is a channel entry, delay watching app list generation */
+ if (strcmp("enter", type)) {
+ watching_apps = get_bridge_watching_apps(blob->bridge);
+ if (!watching_apps) {
+ return;
+ }
+ }
+
+ msg = handle_bridge_blob(blob);
+ if (!msg) {
+ return;
+ }
+
+ /* handle watching_apps list generation now that interested apps have been updated */
+ if (!watching_apps) {
+ watching_apps = get_bridge_watching_apps(blob->bridge);
+ if (!watching_apps) {
+ return;
+ }
+ }
+
+ distribute_message(watching_apps, msg);
}
static int list_merge_cb(void *obj, void *arg, int flags)
@@ -1043,36 +1071,15 @@
{
RAII_VAR(struct ao2_container *, watching_to, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, watching_from, NULL, ao2_cleanup);
- RAII_VAR(struct bridge_info *, info_to, NULL, ao2_cleanup);
- RAII_VAR(struct bridge_info *, info_from, NULL, ao2_cleanup);
- char *to_uniqueid = ast_strdupa(merge->to->uniqueid);
- char *from_uniqueid = ast_strdupa(merge->from->uniqueid);
struct ao2_container *watching_combined;
- info_to = bridge_info_create_or_update(to_uniqueid, NULL);
- info_from = bridge_info_create_or_update(from_uniqueid, NULL);
- if (!info_to && !info_from) {
- /* no interest in this merge */
- return NULL;
- }
-
- if (info_to) {
- watching_to = get_bridge_watching_apps(info_to);
- if (!watching_to) {
- return NULL;
- }
- }
- if (info_from) {
- watching_from = get_bridge_watching_apps(info_from);
- if (!watching_from) {
- return NULL;
- }
- }
+ watching_to = get_bridge_watching_apps(merge->to);
+ watching_from = get_bridge_watching_apps(merge->from);
/* handle the merge of the lists */
- if (info_to && !info_from) {
+ if (watching_to && !watching_from) {
watching_combined = watching_to;
- } else if (!info_to && info_from) {
+ } else if (!watching_to && watching_from) {
watching_combined = watching_from;
} else {
/* apps are watching both bridges */
@@ -1103,7 +1110,7 @@
return;
}
- ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
+ distribute_message(watching_apps, msg);
}
/*!
@@ -1285,6 +1292,23 @@
return app_channel_event_create("channel-event-dtmf-received", obj->snapshot, extra);
}
+static struct ast_json *handle_blob_enter(struct ast_bridge_blob *obj)
+{
+ RAII_VAR(struct bridge_info *, info, NULL, ao2_cleanup);
+ char *bridge_uniqueid = ast_strdupa(obj->bridge->uniqueid);
+ char *channel_uniqueid = ast_strdupa(obj->channel->uniqueid);
+
+ /* force the bridge-of-interest channel-of-interest listing to update for the new channel */
+ info = bridge_info_create_or_update(bridge_uniqueid, channel_uniqueid);
+
+ return app_bridge_event_create("bridge-event-enter", obj->bridge, obj->channel, NULL);
+}
+
+static struct ast_json *handle_blob_leave(struct ast_bridge_blob *obj)
+{
+ return app_bridge_event_create("bridge-event-leave", obj->bridge, obj->channel, NULL);
+}
+
static struct ast_json *handle_blob_generic(struct ast_channel_blob *obj)
{
RAII_VAR(struct ast_str *, event_name, ast_str_create(32), ast_free);
@@ -1294,7 +1318,7 @@
return app_channel_event_create(ast_str_buffer(event_name), obj->snapshot, obj->blob);
}
-static void register_blob_handler(const char *blob_type, blob_handler_cb blob_type_handler_cb)
+static void register_blob_handler(struct ao2_container *handlers, const char *blob_type, void *handler_cb)
{
RAII_VAR(struct blob_handler *, handler, ao2_alloc(sizeof(*handler), blob_handler_dtor), ao2_cleanup);
@@ -1302,13 +1326,13 @@
return;
}
- handler->handler = blob_type_handler_cb;
+ handler->handler = handler_cb;
handler->type = ast_strdup(blob_type);
if (!handler->type) {
return;
}
- ao2_link(blob_handlers, handler);
+ ao2_link(handlers, handler);
}
static int blob_handler_hash(const void *obj, const int flags)
@@ -1340,15 +1364,15 @@
return AST_MODULE_LOAD_FAILURE;
}
- blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
- if (!blob_handlers) {
+ channel_blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
+ if (!channel_blob_handlers) {
return AST_MODULE_LOAD_FAILURE;
}
- register_blob_handler("userevent", handle_blob_generic);
- register_blob_handler("hangup_request", handle_blob_generic);
- register_blob_handler("varset", handle_blob_generic);
- register_blob_handler("dtmf_end", handle_blob_dtmf);
+ register_blob_handler(channel_blob_handlers, "userevent", handle_blob_generic);
+ register_blob_handler(channel_blob_handlers, "hangup_request", handle_blob_generic);
+ register_blob_handler(channel_blob_handlers, "varset", handle_blob_generic);
+ register_blob_handler(channel_blob_handlers, "dtmf_end", handle_blob_dtmf);
channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
if (!channel_router) {
@@ -1366,6 +1390,14 @@
return AST_MODULE_LOAD_FAILURE;
}
+ bridge_blob_handlers = ao2_container_alloc(BLOB_HANDLER_BUCKETS, blob_handler_hash, blob_handler_cmp);
+ if (!bridge_blob_handlers) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ register_blob_handler(bridge_blob_handlers, "enter", handle_blob_enter);
+ register_blob_handler(bridge_blob_handlers, "leave", handle_blob_leave);
+
bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached()));
if (!bridge_router) {
return AST_MODULE_LOAD_FAILURE;
@@ -1375,6 +1407,11 @@
bridge_router,
stasis_cache_update_type(),
bridge_state_cb,
+ NULL);
+ ret |= stasis_message_router_add(
+ bridge_router,
+ ast_bridge_blob_type(),
+ bridge_blob_cb,
NULL);
ret |= stasis_message_router_add(
bridge_router,
@@ -1395,8 +1432,11 @@
stasis_message_router_unsubscribe(channel_router);
channel_router = NULL;
- ao2_cleanup(blob_handlers);
- blob_handlers = NULL;
+ ao2_cleanup(channel_blob_handlers);
+ channel_blob_handlers = NULL;
+
+ ao2_cleanup(bridge_blob_handlers);
+ bridge_blob_handlers = NULL;
stasis_message_router_unsubscribe(bridge_router);
bridge_router = NULL;
More information about the asterisk-commits
mailing list