[asterisk-commits] dlee: branch dlee/ASTERISK-21969 r396398 - in /team/dlee/ASTERISK-21969: main...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Aug 8 12:46:33 CDT 2013


Author: dlee
Date: Thu Aug  8 12:46:30 2013
New Revision: 396398

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396398
Log:
Bridge enter/leave events

Modified:
    team/dlee/ASTERISK-21969/main/stasis_bridges.c
    team/dlee/ASTERISK-21969/res/res_stasis.c
    team/dlee/ASTERISK-21969/res/stasis/app.c
    team/dlee/ASTERISK-21969/res/stasis/app.h

Modified: team/dlee/ASTERISK-21969/main/stasis_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/main/stasis_bridges.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/main/stasis_bridges.c (original)
+++ team/dlee/ASTERISK-21969/main/stasis_bridges.c Thu Aug  8 12:46:30 2013
@@ -132,6 +132,8 @@
 
 static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message);
 static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg);
 
 static struct stasis_cp_all *bridge_cache_all;
 
@@ -140,8 +142,10 @@
  */
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+	.to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+	.to_json = ast_channel_left_bridge_to_json);
 STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami);
 STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
 /*! @} */
@@ -415,6 +419,35 @@
 	/* state first, then leave blob (opposite of enter, preserves nesting of events) */
 	bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
 	stasis_publish(ast_bridge_topic(bridge), msg);
+}
+
+static struct ast_json *simple_bridge_channel_event(
+        const char *type,
+        struct ast_bridge_snapshot *bridge_snapshot,
+        struct ast_channel_snapshot *channel_snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+                "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+	return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+	return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
 }
 
 typedef struct ast_json *(*json_item_serializer_cb)(void *obj);

Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Thu Aug  8 12:46:30 2013
@@ -455,7 +455,6 @@
 
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
-	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
 	int res = 0;
 
 	ast_assert(chan != NULL);
@@ -486,9 +485,9 @@
 		return -1;
 	}
 
-	forwards = app_subscribe_channel(app, chan);
-	if (!forwards) {
-		ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+	res = app_subscribe_channel(app, chan);
+	if (res != 0) {
+		ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
 			app_name, ast_channel_name(chan));
 		return -1;
 	}
@@ -497,8 +496,18 @@
 		RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
 		int r;
 		int command_count;
-
-		if (stasis_app_get_bridge(control)) {
+		struct ast_bridge *last_bridge = NULL;
+		struct ast_bridge *bridge = NULL;
+
+		last_bridge = bridge;
+		bridge = stasis_app_get_bridge(control);
+
+		if (bridge != last_bridge) {
+			app_unsubscribe_bridge(app, last_bridge);
+			app_subscribe_bridge(app, bridge);
+		}
+
+		if (bridge) {
 			/* Bridge is handling channel frames */
 			control_wait(control);
 			control_dispatch_all(control, chan);
@@ -548,6 +557,8 @@
 		}
 	}
 
+	app_unsubscribe_channel(app, chan);
+
 	res = send_end_msg(app, chan);
 	if (res != 0) {
 		ast_log(LOG_ERROR,

Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Thu Aug  8 12:46:30 2013
@@ -35,12 +35,12 @@
 #include "asterisk/stasis_message_router.h"
 
 struct app {
-	/*! Atomic integer counting channels in this app */
-	int channel_count;
 	/*! Aggregation topic for this application. */
 	struct stasis_topic *topic;
 	/*! Router for handling messages forwarded to \a topic. */
 	struct stasis_message_router *router;
+	/*! Container of the channel forwards to this app's topic. */
+	struct ao2_container *forwards;
 	/*! Callback function for this application. */
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
@@ -50,14 +50,149 @@
 };
 
 struct app_forwards {
-	struct app *app;
-
-	struct stasis_subscription *channel_forward;
-	struct stasis_subscription *channel_cached_forward;
-
-	struct stasis_subscription *bridge_forward;
-	struct stasis_subscription *bridge_cached_forward;
+	/*! Count of number of times this channel/bridge has been subscribed */
+	int interested;
+
+	/*! Forward for the regular topic */
+	struct stasis_subscription *topic_forward;
+	/*! Forward for the caching topic */
+	struct stasis_subscription *topic_cached_forward;
+
+	/*! Unique id of the object being forwarded */
+	char id[];
 };
+
+static void forwards_dtor(void *obj)
+{
+	struct app_forwards *forwards = obj;
+
+	ast_assert(forwards->topic_forward == NULL);
+	ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+	stasis_unsubscribe(forwards->topic_forward);
+	forwards->topic_forward = NULL;
+	stasis_unsubscribe(forwards->topic_cached_forward);
+	forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+	const char *id)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !id) {
+		return NULL;
+	}
+
+	forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+	if (!forwards) {
+		return NULL;
+	}
+
+	strcpy(forwards->id, id);
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+static struct app_forwards *forwards_create_channel(struct app *app,
+	struct ast_channel *chan)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !chan) {
+		return NULL;
+	}
+
+	forwards = forwards_create(app, ast_channel_uniqueid(chan));
+	if (!forwards) {
+		return NULL;
+	}
+
+	forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+		app->topic);
+	if (!forwards->topic_forward) {
+		return NULL;
+	}
+
+	forwards->topic_cached_forward = stasis_forward_all(
+		ast_channel_topic_cached(chan), app->topic);
+	if (!forwards->topic_cached_forward) {
+		/* Half-subscribed is a bad thing */
+		stasis_unsubscribe(forwards->topic_forward);
+		forwards->topic_forward = NULL;
+		return NULL;
+	}
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+static struct app_forwards *forwards_create_bridge(struct app *app,
+	struct ast_bridge *bridge)
+{
+	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (!app || !bridge) {
+		return NULL;
+	}
+
+	forwards = forwards_create(app, bridge->uniqueid);
+	if (!forwards) {
+		return NULL;
+	}
+
+	forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+		app->topic);
+	if (!forwards->topic_forward) {
+		return NULL;
+	}
+
+	forwards->topic_cached_forward = stasis_forward_all(
+		ast_bridge_topic_cached(bridge), app->topic);
+	if (!forwards->topic_cached_forward) {
+		/* Half-subscribed is a bad thing */
+		stasis_unsubscribe(forwards->topic_forward);
+		forwards->topic_forward = NULL;
+		return NULL;
+	}
+
+	ao2_ref(forwards, +1);
+	return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
+{
+    const struct app_forwards *object_left = obj_left;
+    const struct app_forwards *object_right = obj_right;
+    const char *right_key = obj_right;
+    int cmp;
+
+    switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+    case OBJ_POINTER:
+        right_key = object_right->id;
+        /* Fall through */
+    case OBJ_KEY:
+        cmp = strcmp(object_left->id, right_key);
+        break;
+    case OBJ_PARTIAL_KEY:
+        /*
+         * We could also use a partial key struct containing a length
+         * so strlen() does not get called for every comparison instead.
+         */
+        cmp = strncmp(object_left->id, right_key, strlen(right_key));
+        break;
+    default:
+        /* Sort can only work on something with a full or partial key. */
+        ast_assert(0);
+        cmp = 0;
+        break;
+    }
+    return cmp;
+}
 
 static void app_dtor(void *obj)
 {
@@ -84,6 +219,55 @@
 	app_send(app, json);
 }
 
+static struct ast_json *simple_bridge_event(
+        const char *type,
+        struct ast_bridge_snapshot *snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
+
+static void sub_bridge_update_handler(void *data,
+                struct stasis_subscription *sub,
+                struct stasis_topic *topic,
+                struct stasis_message *message)
+{
+	struct app *app = data;
+        struct stasis_cache_update *update;
+        struct ast_bridge_snapshot *new_snapshot;
+        struct ast_bridge_snapshot *old_snapshot;
+        const struct timeval *tv;
+        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+	ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+	update = stasis_message_data(message);
+
+	ast_assert(update->type == ast_bridge_snapshot_type());
+
+	new_snapshot = stasis_message_data(update->new_snapshot);
+	old_snapshot = stasis_message_data(update->old_snapshot);
+	tv = update->new_snapshot ?
+		stasis_message_timestamp(update->new_snapshot) :
+		stasis_message_timestamp(message);
+
+
+        if (!new_snapshot) {
+                json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
+        } else if (!old_snapshot) {
+                json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+        }
+
+        if (!json) {
+                return;
+        }
+
+        app_send(app, json);
+}
+
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
 	RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -102,6 +286,10 @@
 		return NULL;
 	}
 
+	app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+		AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+		forwards_sort, NULL);
+
 	app->topic = stasis_topic_create(name);
 	if (!app->topic) {
 		return NULL;
@@ -112,26 +300,13 @@
 		return NULL;
 	}
 
+        res |= stasis_message_router_add_cache_update(app->router,
+		ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
 	/*
-	res |= stasis_message_router_add(app->router,
-		ast_channel_user_event_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-		ast_channel_varset_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-		ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-		ast_channel_hangup_request_type(), sub_channel_blob_handler,
-		app);
-        res |= stasis_message_router_add(app->router,
-		stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
         res |= stasis_message_router_add(app->router,
 		ast_bridge_merge_message_type(), sub_bridge_merge_handler,
 		app);
-        res |= stasis_message_router_add(app->router,
-		ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
-		app);
-        res |= stasis_message_router_add(app->router,
-		ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
 	*/
 	res |= stasis_message_router_set_default(app->router,
 		sub_default_handler, app);
@@ -199,8 +374,7 @@
 {
 	SCOPED_AO2LOCK(lock, app);
 
-	return app->handler == NULL &&
-		ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
+	return app->handler == NULL && ao2_container_count(app->forwards) == 0;
 }
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -236,96 +410,93 @@
 	return app->name;
 }
 
-static void forwards_dtor(void *obj)
-{
-	struct app_forwards *forwards = obj;
-
-	stasis_unsubscribe(forwards->channel_forward);
-	forwards->channel_forward = NULL;
-	stasis_unsubscribe(forwards->channel_cached_forward);
-	forwards->channel_cached_forward = NULL;
-	stasis_unsubscribe(forwards->bridge_forward);
-	forwards->bridge_forward = NULL;
-	stasis_unsubscribe(forwards->bridge_cached_forward);
-	forwards->bridge_cached_forward = NULL;
-
-	ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
-
-	ao2_cleanup(forwards->app);
-	forwards->app = NULL;
-}
-
-struct app_forwards *app_subscribe_channel(struct app *app,
-	struct ast_channel *chan)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
+{
+	if (!app || !chan) {
+		return -1;
+	} else {
+		RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+		SCOPED_AO2LOCK(lock, app->forwards);
+
+		forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+			OBJ_SEARCH_KEY | OBJ_NOLOCK);
+		if (!forwards) {
+			/* Forwards not found, create one */
+			forwards = forwards_create_channel(app, chan);
+			if (!forwards) {
+				return -1;
+			}
+			ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+		}
+
+		++forwards->interested;
+		return 0;
+	}
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
 {
 	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-
+	SCOPED_AO2LOCK(lock, app->forwards);
+
+	forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (!forwards) {
+		ast_log(LOG_ERROR,
+			"App '%s' not subscribed to %s '%s'",
+			app->name, kind, id);
+		return -1;
+	}
+
+	if (--forwards->interested == 0) {
+		/* No one is interested any more; unsubscribe */
+		forwards_unsubscribe(forwards);
+		ao2_find(app->forwards, forwards,
+			OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+			OBJ_NODATA);
+	}
+
+	return 0;
+}
+
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
+{
 	if (!app || !chan) {
-		return NULL;
-	}
-
-	forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
-	if (!forwards) {
-		return NULL;
-	}
-
-	ao2_ref(app, +1);
-	forwards->app = app;
-
-	forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
-		app->topic);
-	if (!forwards->channel_forward) {
-		return NULL;
-	}
-
-	forwards->channel_cached_forward = stasis_forward_all(
-		ast_channel_topic_cached(chan), app->topic);
-	if (!forwards->channel_cached_forward) {
-		return NULL;
-	}
-
-	ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
-	ao2_ref(forwards, +1);
-	return forwards;
-}
-
-int app_subscribe_bridge(struct app_forwards *forwards,
-	struct ast_bridge *bridge)
-{
-	if (!forwards || !bridge) {
 		return -1;
 	}
 
-	ast_assert(forwards->app != NULL);
-
-	app_unsubscribe_bridge(forwards);
-
-	forwards->bridge_forward = stasis_forward_all(
-		ast_bridge_topic(bridge), forwards->app->topic);
-	if (!forwards->bridge_forward) {
+	return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+	if (!app || !bridge) {
 		return -1;
-	}
-
-	forwards->bridge_cached_forward = stasis_forward_all(
-		ast_bridge_topic_cached(bridge), forwards->app->topic);
-	if (!forwards->bridge_cached_forward) {
-		/* Probably a bad idea to stay half-subscribed */
-		stasis_unsubscribe(forwards->bridge_forward);
-		forwards->bridge_forward = NULL;
+	} else {
+		RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+		SCOPED_AO2LOCK(lock, app->forwards);
+
+		forwards = ao2_find(app->forwards, bridge->uniqueid,
+			OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+		if (!forwards) {
+			/* Forwards not found, create one */
+			forwards = forwards_create_bridge(app, bridge);
+			if (!forwards) {
+				return -1;
+			}
+			ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+		}
+
+		++forwards->interested;
+		return 0;
+	}
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+	if (!app || !bridge) {
 		return -1;
 	}
 
-	return 0;
-}
-
-void app_unsubscribe_bridge(struct app_forwards *forwards)
-{
-	if (!forwards) {
-		return;
-	}
-
-	stasis_unsubscribe(forwards->bridge_forward);
-	forwards->bridge_forward = NULL;
-	stasis_unsubscribe(forwards->bridge_cached_forward);
-	forwards->bridge_cached_forward = NULL;
-}
+	return unsubscribe(app, "bridge", bridge->uniqueid);
+}

Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Thu Aug  8 12:46:30 2013
@@ -116,19 +116,25 @@
  * \return 0 on success.
  * \return Non-zero on error.
  */
-struct app_forwards *app_subscribe_channel(struct app *app,
-	struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
+
+/*!
+ * \brief Cancel the subscription an app has for a channel.
+ *
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
+ */
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
 
 /*!
  * \brief Add a bridge subscription to an existing channel subscription.
  *
- * \param forwards Return from app_subscribe_channel().
+ * \param app Application.
  * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_subscribe_bridge(struct app_forwards *forwards,
-	struct ast_bridge *bridge);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 /*!
  * \brief Cancel the bridge subscription for an application.
@@ -138,6 +144,6 @@
  * \return 0 on success.
  * \return Non-zero on error.
  */
-void app_unsubscribe_bridge(struct app_forwards *forwards);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 #endif /* _ASTERISK_RES_STASIS_APP_H */




More information about the asterisk-commits mailing list