[asterisk-commits] dlee: branch dlee/ASTERISK-21969 r396398 - in /team/dlee/ASTERISK-21969: main...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Aug 8 12:46:33 CDT 2013
Author: dlee
Date: Thu Aug 8 12:46:30 2013
New Revision: 396398
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396398
Log:
Bridge enter/leave events
Modified:
team/dlee/ASTERISK-21969/main/stasis_bridges.c
team/dlee/ASTERISK-21969/res/res_stasis.c
team/dlee/ASTERISK-21969/res/stasis/app.c
team/dlee/ASTERISK-21969/res/stasis/app.h
Modified: team/dlee/ASTERISK-21969/main/stasis_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/main/stasis_bridges.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/main/stasis_bridges.c (original)
+++ team/dlee/ASTERISK-21969/main/stasis_bridges.c Thu Aug 8 12:46:30 2013
@@ -132,6 +132,8 @@
static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
static struct stasis_cp_all *bridge_cache_all;
@@ -140,8 +142,10 @@
*/
STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+ .to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+ .to_json = ast_channel_left_bridge_to_json);
STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
@@ -415,6 +419,35 @@
/* state first, then leave blob (opposite of enter, preserves nesting of events) */
bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
stasis_publish(ast_bridge_topic(bridge), msg);
+}
+
+static struct ast_json *simple_bridge_channel_event(
+ const char *type,
+ struct ast_bridge_snapshot *bridge_snapshot,
+ struct ast_channel_snapshot *channel_snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+ "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+ struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+ return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+ obj->channel, stasis_message_timestamp(msg));
}
typedef struct ast_json *(*json_item_serializer_cb)(void *obj);
Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Thu Aug 8 12:46:30 2013
@@ -455,7 +455,6 @@
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
int res = 0;
ast_assert(chan != NULL);
@@ -486,9 +485,9 @@
return -1;
}
- forwards = app_subscribe_channel(app, chan);
- if (!forwards) {
- ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name, ast_channel_name(chan));
return -1;
}
@@ -497,8 +496,18 @@
RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
int r;
int command_count;
-
- if (stasis_app_get_bridge(control)) {
+ struct ast_bridge *last_bridge = NULL;
+ struct ast_bridge *bridge = NULL;
+
+ last_bridge = bridge;
+ bridge = stasis_app_get_bridge(control);
+
+ if (bridge != last_bridge) {
+ app_unsubscribe_bridge(app, last_bridge);
+ app_subscribe_bridge(app, bridge);
+ }
+
+ if (bridge) {
/* Bridge is handling channel frames */
control_wait(control);
control_dispatch_all(control, chan);
@@ -548,6 +557,8 @@
}
}
+ app_unsubscribe_channel(app, chan);
+
res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Thu Aug 8 12:46:30 2013
@@ -35,12 +35,12 @@
#include "asterisk/stasis_message_router.h"
struct app {
- /*! Atomic integer counting channels in this app */
- int channel_count;
/*! Aggregation topic for this application. */
struct stasis_topic *topic;
/*! Router for handling messages forwarded to \a topic. */
struct stasis_message_router *router;
+ /*! Container of the channel forwards to this app's topic. */
+ struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
@@ -50,14 +50,149 @@
};
struct app_forwards {
- struct app *app;
-
- struct stasis_subscription *channel_forward;
- struct stasis_subscription *channel_cached_forward;
-
- struct stasis_subscription *bridge_forward;
- struct stasis_subscription *bridge_cached_forward;
+ /*! Count of number of times this channel/bridge has been subscribed */
+ int interested;
+
+ /*! Forward for the regular topic */
+ struct stasis_subscription *topic_forward;
+ /*! Forward for the caching topic */
+ struct stasis_subscription *topic_cached_forward;
+
+ /*! Unique id of the object being forwarded */
+ char id[];
};
+
+static void forwards_dtor(void *obj)
+{
+ struct app_forwards *forwards = obj;
+
+ ast_assert(forwards->topic_forward == NULL);
+ ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ stasis_unsubscribe(forwards->topic_cached_forward);
+ forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+ const char *id)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !id) {
+ return NULL;
+ }
+
+ forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+ if (!forwards) {
+ return NULL;
+ }
+
+ strcpy(forwards->id, id);
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+static struct app_forwards *forwards_create_channel(struct app *app,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !chan) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_channel_topic_cached(chan), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+static struct app_forwards *forwards_create_bridge(struct app *app,
+ struct ast_bridge *bridge)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !bridge) {
+ return NULL;
+ }
+
+ forwards = forwards_create(app, bridge->uniqueid);
+ if (!forwards) {
+ return NULL;
+ }
+
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
+ if (!forwards->topic_forward) {
+ return NULL;
+ }
+
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_bridge_topic_cached(bridge), app->topic);
+ if (!forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ stasis_unsubscribe(forwards->topic_forward);
+ forwards->topic_forward = NULL;
+ return NULL;
+ }
+
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+ const struct app_forwards *object_left = obj_left;
+ const struct app_forwards *object_right = obj_right;
+ const char *right_key = obj_right;
+ int cmp;
+
+ switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+ case OBJ_POINTER:
+ right_key = object_right->id;
+ /* Fall through */
+ case OBJ_KEY:
+ cmp = strcmp(object_left->id, right_key);
+ break;
+ case OBJ_PARTIAL_KEY:
+ /*
+ * We could also use a partial key struct containing a length
+ * so strlen() does not get called for every comparison instead.
+ */
+ cmp = strncmp(object_left->id, right_key, strlen(right_key));
+ break;
+ default:
+ /* Sort can only work on something with a full or partial key. */
+ ast_assert(0);
+ cmp = 0;
+ break;
+ }
+ return cmp;
+}
static void app_dtor(void *obj)
{
@@ -84,6 +219,55 @@
app_send(app, json);
}
+static struct ast_json *simple_bridge_event(
+ const char *type,
+ struct ast_bridge_snapshot *snapshot,
+ const struct timeval *tv)
+{
+ return ast_json_pack("{s: s, s: o, s: o}",
+ "type", type,
+ "timestamp", ast_json_timeval(*tv, NULL),
+ "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
+
+static void sub_bridge_update_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ struct app *app = data;
+ struct stasis_cache_update *update;
+ struct ast_bridge_snapshot *new_snapshot;
+ struct ast_bridge_snapshot *old_snapshot;
+ const struct timeval *tv;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+ update = stasis_message_data(message);
+
+ ast_assert(update->type == ast_bridge_snapshot_type());
+
+ new_snapshot = stasis_message_data(update->new_snapshot);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+ tv = update->new_snapshot ?
+ stasis_message_timestamp(update->new_snapshot) :
+ stasis_message_timestamp(message);
+
+
+ if (!new_snapshot) {
+ json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
+ } else if (!old_snapshot) {
+ json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+ }
+
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
+}
+
struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -102,6 +286,10 @@
return NULL;
}
+ app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+ AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+ forwards_sort, NULL);
+
app->topic = stasis_topic_create(name);
if (!app->topic) {
return NULL;
@@ -112,26 +300,13 @@
return NULL;
}
+ res |= stasis_message_router_add_cache_update(app->router,
+ ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
/*
- res |= stasis_message_router_add(app->router,
- ast_channel_user_event_type(), sub_channel_blob_handler, app);
- res |= stasis_message_router_add(app->router,
- ast_channel_varset_type(), sub_channel_blob_handler, app);
- res |= stasis_message_router_add(app->router,
- ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
- res |= stasis_message_router_add(app->router,
- ast_channel_hangup_request_type(), sub_channel_blob_handler,
- app);
- res |= stasis_message_router_add(app->router,
- stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
res |= stasis_message_router_add(app->router,
ast_bridge_merge_message_type(), sub_bridge_merge_handler,
app);
- res |= stasis_message_router_add(app->router,
- ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
- app);
- res |= stasis_message_router_add(app->router,
- ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
*/
res |= stasis_message_router_set_default(app->router,
sub_default_handler, app);
@@ -199,8 +374,7 @@
{
SCOPED_AO2LOCK(lock, app);
- return app->handler == NULL &&
- ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
+ return app->handler == NULL && ao2_container_count(app->forwards) == 0;
}
void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -236,96 +410,93 @@
return app->name;
}
-static void forwards_dtor(void *obj)
-{
- struct app_forwards *forwards = obj;
-
- stasis_unsubscribe(forwards->channel_forward);
- forwards->channel_forward = NULL;
- stasis_unsubscribe(forwards->channel_cached_forward);
- forwards->channel_cached_forward = NULL;
- stasis_unsubscribe(forwards->bridge_forward);
- forwards->bridge_forward = NULL;
- stasis_unsubscribe(forwards->bridge_cached_forward);
- forwards->bridge_cached_forward = NULL;
-
- ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
-
- ao2_cleanup(forwards->app);
- forwards->app = NULL;
-}
-
-struct app_forwards *app_subscribe_channel(struct app *app,
- struct ast_channel *chan)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
+{
+ if (!app || !chan) {
+ return -1;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ ast_log(LOG_ERROR,
+ "App '%s' not subscribed to %s '%s'",
+ app->name, kind, id);
+ return -1;
+ }
+
+ if (--forwards->interested == 0) {
+ /* No one is interested any more; unsubscribe */
+ forwards_unsubscribe(forwards);
+ ao2_find(app->forwards, forwards,
+ OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+ OBJ_NODATA);
+ }
+
+ return 0;
+}
+
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
+{
if (!app || !chan) {
- return NULL;
- }
-
- forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
- if (!forwards) {
- return NULL;
- }
-
- ao2_ref(app, +1);
- forwards->app = app;
-
- forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->channel_forward) {
- return NULL;
- }
-
- forwards->channel_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->channel_cached_forward) {
- return NULL;
- }
-
- ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
- ao2_ref(forwards, +1);
- return forwards;
-}
-
-int app_subscribe_bridge(struct app_forwards *forwards,
- struct ast_bridge *bridge)
-{
- if (!forwards || !bridge) {
return -1;
}
- ast_assert(forwards->app != NULL);
-
- app_unsubscribe_bridge(forwards);
-
- forwards->bridge_forward = stasis_forward_all(
- ast_bridge_topic(bridge), forwards->app->topic);
- if (!forwards->bridge_forward) {
+ return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
return -1;
- }
-
- forwards->bridge_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), forwards->app->topic);
- if (!forwards->bridge_cached_forward) {
- /* Probably a bad idea to stay half-subscribed */
- stasis_unsubscribe(forwards->bridge_forward);
- forwards->bridge_forward = NULL;
+ } else {
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, bridge->uniqueid,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ return 0;
+ }
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+ if (!app || !bridge) {
return -1;
}
- return 0;
-}
-
-void app_unsubscribe_bridge(struct app_forwards *forwards)
-{
- if (!forwards) {
- return;
- }
-
- stasis_unsubscribe(forwards->bridge_forward);
- forwards->bridge_forward = NULL;
- stasis_unsubscribe(forwards->bridge_cached_forward);
- forwards->bridge_cached_forward = NULL;
-}
+ return unsubscribe(app, "bridge", bridge->uniqueid);
+}
Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Thu Aug 8 12:46:30 2013
@@ -116,19 +116,25 @@
* \return 0 on success.
* \return Non-zero on error.
*/
-struct app_forwards *app_subscribe_channel(struct app *app,
- struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
+
+/*!
+ * \brief Cancel the subscription an app has for a channel.
+ *
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
+ */
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
/*!
* \brief Add a bridge subscription to an existing channel subscription.
*
- * \param forwards Return from app_subscribe_channel().
+ * \param app Application.
* \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_subscribe_bridge(struct app_forwards *forwards,
- struct ast_bridge *bridge);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
/*!
* \brief Cancel the bridge subscription for an application.
@@ -138,6 +144,6 @@
* \return 0 on success.
* \return Non-zero on error.
*/
-void app_unsubscribe_bridge(struct app_forwards *forwards);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
#endif /* _ASTERISK_RES_STASIS_APP_H */
More information about the asterisk-commits
mailing list