[svn-commits] dlee: branch dlee/ASTERISK-21969 r396360 - in /team/dlee/ASTERISK-21969/res: ...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Wed Aug 7 15:53:52 CDT 2013
Author: dlee
Date: Wed Aug 7 15:53:51 2013
New Revision: 396360
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396360
Log:
The pieces are scattered all over the floor.
Modified:
team/dlee/ASTERISK-21969/res/res_stasis.c
team/dlee/ASTERISK-21969/res/stasis/app.c
team/dlee/ASTERISK-21969/res/stasis/app.h
Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Wed Aug 7 15:53:51 2013
@@ -82,6 +82,12 @@
#define CONTROLS_NUM_BUCKETS 127
/*!
+ * \brief Number of buckets for the Stasis bridges hash table. Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
* \brief Stasis application container.
*/
struct ao2_container *apps_registry;
@@ -89,12 +95,6 @@
struct ao2_container *app_controls;
struct ao2_container *app_bridges;
-
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
/*! AO2 hash function for \ref app */
static int app_hash(const void *obj, const int flags)
@@ -202,36 +202,6 @@
/*! \brief Typedef for blob handler callbacks */
typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- char *uniqueid = arg;
-
- return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
- struct ao2_container *watching_apps;
- char *uniqueid_dup;
- RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
- ast_assert(uniqueid != NULL);
-
- uniqueid_dup = ast_strdupa(uniqueid);
-
- watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup);
- watching_apps = watching_apps_iter->c;
-
- if (!ao2_container_count(watching_apps)) {
- return NULL;
- }
-
- ao2_ref(watching_apps, +1);
- return watching_apps_iter->c;
-}
/*! \brief Typedef for callbacks that get called on channel snapshot updates */
typedef struct ast_json *(*channel_snapshot_monitor)(
@@ -354,75 +324,6 @@
channel_dialplan,
channel_callerid
};
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- struct ast_json *msg = arg;
-
- app_send(app, msg);
- return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct stasis_cache_update *update = stasis_message_data(message);
- struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
- struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- /* Pull timestamp from the new snapshot, or from the update message
- * when there isn't one. */
- const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
- int i;
-
- watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
- if (!watching_apps) {
- return;
- }
-
- for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
- if (msg) {
- ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
- }
- }
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json *msg)
-{
- ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct ast_channel_blob *obj = stasis_message_data(message);
-
- if (!obj->snapshot) {
- return;
- }
-
- msg = stasis_message_to_json(message);
- if (!msg) {
- return;
- }
-
- watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
- if (!watching_apps) {
- return;
- }
-
- distribute_message(watching_apps, msg);
-}
/*!
* \brief In addition to running ao2_cleanup(), this function also removes the
@@ -471,7 +372,7 @@
ast_bridge_destroy(bridge);
}
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
int argc, char *argv[])
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -512,7 +413,7 @@
return 0;
}
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
{
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -554,6 +455,7 @@
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
int res = 0;
ast_assert(chan != NULL);
@@ -577,15 +479,17 @@
}
ao2_link(app_controls, control);
- res = app_send_start_msg(app, chan, argc, argv);
+ res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending start message to '%s'\n", app_name);
- return res;
- }
-
- if (app_add_channel(app, chan)) {
- ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name);
+ return -1;
+ }
+
+ forwards = app_subscribe_channel(app, chan);
+ if (!forwards) {
+ ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+ app_name, ast_channel_name(chan));
return -1;
}
@@ -637,8 +541,7 @@
}
}
- app_remove_channel(app, chan);
- res = app_send_end_msg(app, chan);
+ res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending end message to %s\n", app_name);
@@ -749,296 +652,28 @@
ast_module_unref(ast_module_info->self);
}
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
- struct app *app = obj;
- char *uniqueid = arg;
-
- return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
- struct ao2_container *watching_apps;
- char *uniqueid_dup;
- RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
- ast_assert(uniqueid != NULL);
-
- uniqueid_dup = ast_strdupa(uniqueid);
-
- watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup);
- watching_apps = watching_apps_iter->c;
-
- if (!ao2_container_count(watching_apps)) {
- return NULL;
- }
-
- ao2_ref(watching_apps, +1);
- return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
- app_remove_bridge(obj, arg);
- return 0;
-}
-
-static struct ast_json *simple_bridge_event(
- const char *type,
- struct ast_bridge_snapshot *snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: o}",
- "type", type,
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
- const char *type,
- struct ast_bridge_snapshot *bridge_snapshot,
- struct ast_channel_snapshot *channel_snapshot,
- const struct timeval *tv)
-{
- return ast_json_pack("{s: s, s: o, s: o, s: o}",
- "type", type,
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
- "channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
- struct stasis_cache_update *update = stasis_message_data(message);
- struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
- struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
- const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message);
-
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid);
- if (!watching_apps || !ao2_container_count(watching_apps)) {
- return;
- }
-
- if (!new_snapshot) {
- RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free);
-
- /* The bridge has gone away. Create the message, make sure no apps are
- * watching this bridge anymore, and destroy the bridge's control
- * structure */
- msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
- ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id);
- stasis_app_bridge_destroy(old_snapshot->uniqueid);
- } else if (!old_snapshot) {
- msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
- }
-
- if (!msg) {
- return;
- }
-
- distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
- /* remove any current entries for this app */
- ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE);
- /* relink as the only entry */
- ao2_link(arg, obj);
- return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container *src)
-{
- ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
- app_add_bridge(obj, arg);
- return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id)
-{
- RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
- ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
- struct ast_bridge_merge_message *merge = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
- const struct timeval *tv = stasis_message_timestamp(message);
-
- watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
- if (watching_apps_to) {
- update_apps_list(watching_apps_all, watching_apps_to);
- }
-
- watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
- if (watching_apps_from) {
- update_bridge_interest(watching_apps_from, merge->to->uniqueid);
- update_apps_list(watching_apps_all, watching_apps_from);
- }
-
- if (!ao2_container_count(watching_apps_all)) {
- return;
- }
-
- msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
- "type", "BridgeMerged",
- "timestamp", ast_json_timeval(*tv, NULL),
- "bridge", ast_bridge_snapshot_to_json(merge->to),
- "bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
- if (!msg) {
- return;
- }
-
- distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
- RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
- struct ast_bridge_blob *obj = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
- if (watching_apps_bridge) {
- update_apps_list(watching_apps_all, watching_apps_bridge);
- }
-
- watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid);
- if (watching_apps_channel) {
- update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid);
- update_apps_list(watching_apps_all, watching_apps_channel);
- }
-
- if (!ao2_container_count(watching_apps_all)) {
- return;
- }
-
- msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
- obj->channel, stasis_message_timestamp(message));
-
- distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
- struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup);
- struct ast_bridge_blob *obj = stasis_message_data(message);
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
- watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
- if (!watching_apps_bridge) {
- return;
- }
-
- msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
- obj->channel, stasis_message_timestamp(message));
-
- distribute_message(watching_apps_bridge, msg);
-}
-
static int load_module(void)
{
- int r = 0;
-
- apps_registry =
- ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+ apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+ app_compare);
if (apps_registry == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
- app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
- control_hash, control_compare);
+ app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+ control_compare);
if (app_controls == NULL) {
return AST_MODULE_LOAD_FAILURE;
}
- app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
- bridges_hash, bridges_compare);
- if (app_bridges == NULL) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
- if (!channel_router) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
- /* TODO: This could be handled a lot better. Instead of subscribing to
- * the one caching topic and filtering out messages by channel id, we
- * should have individual caching topics per-channel, with a shared
- * back-end cache. That would simplify a lot of what's going on right
- * here.
- */
- r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
- r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
- if (r) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
- if (!bridge_router) {
- return AST_MODULE_LOAD_FAILURE;
- }
-
- r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
- r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
- if (r) {
- return AST_MODULE_LOAD_FAILURE;
- }
+ app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+ bridges_compare);
return AST_MODULE_LOAD_SUCCESS;
}
static int unload_module(void)
{
- int r = 0;
-
- stasis_message_router_unsubscribe_and_join(channel_router);
- channel_router = NULL;
-
- stasis_message_router_unsubscribe_and_join(bridge_router);
- bridge_router = NULL;
-
ao2_cleanup(apps_registry);
apps_registry = NULL;
@@ -1048,7 +683,7 @@
ao2_cleanup(app_bridges);
app_bridges = NULL;
- return r;
+ return 0;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Wed Aug 7 15:53:51 2013
@@ -30,49 +30,65 @@
#include "app.h"
#include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances. Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
struct app {
+ /*! Atomic integer counting channels in this app */
+ int channel_count;
+ /*! Aggregation topic for this application. */
+ struct stasis_topic *topic;
+ /*! Router for handling messages forwarded to \a topic. */
+ struct stasis_message_router *router;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
- /*! List of channel identifiers this app instance is interested in */
- struct ao2_container *channels;
- /*! List of bridge identifiers this app instance owns */
- struct ao2_container *bridges;
/*! Name of the Stasis application */
char name[];
};
+struct app_forwards {
+ struct app *app;
+
+ struct stasis_subscription *channel_forward;
+ struct stasis_subscription *channel_cached_forward;
+
+ struct stasis_subscription *bridge_forward;
+ struct stasis_subscription *bridge_cached_forward;
+};
+
static void app_dtor(void *obj)
{
struct app *app = obj;
+
+ ao2_cleanup(app->topic);
+ app->topic = NULL;
ao2_cleanup(app->data);
app->data = NULL;
- ao2_cleanup(app->channels);
- app->channels = NULL;
- ao2_cleanup(app->bridges);
- app->bridges = NULL;
+}
+
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ struct app *app = data;
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ json = stasis_message_to_json(message);
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
}
struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
size_t size;
+ int res = 0;
ast_assert(name != NULL);
ast_assert(handler != NULL);
@@ -85,76 +101,53 @@
if (!app) {
return NULL;
}
+
+ app->topic = stasis_topic_create(name);
+ if (!app->topic) {
+ return NULL;
+ }
+
+ app->router = stasis_message_router_create(app->topic);
+ if (!app->router) {
+ return NULL;
+ }
+
+ /*
+ res |= stasis_message_router_add(app->router,
+ ast_channel_user_event_type(), sub_channel_blob_handler, app);
+ res |= stasis_message_router_add(app->router,
+ ast_channel_varset_type(), sub_channel_blob_handler, app);
+ res |= stasis_message_router_add(app->router,
+ ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
+ res |= stasis_message_router_add(app->router,
+ ast_channel_hangup_request_type(), sub_channel_blob_handler,
+ app);
+ res |= stasis_message_router_add(app->router,
+ stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
+ res |= stasis_message_router_add(app->router,
+ ast_bridge_merge_message_type(), sub_bridge_merge_handler,
+ app);
+ res |= stasis_message_router_add(app->router,
+ ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
+ app);
+ res |= stasis_message_router_add(app->router,
+ ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
+ */
+ res |= stasis_message_router_set_default(app->router,
+ sub_default_handler, app);
+
+ if (res != 0) {
+ return NULL;
+ }
+
strncpy(app->name, name, size - sizeof(*app));
app->handler = handler;
ao2_ref(data, +1);
app->data = data;
- app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
- if (!app->channels) {
- return NULL;
- }
-
- app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
- if (!app->bridges) {
- return NULL;
- }
-
ao2_ref(app, +1);
return app;
-}
-
-int app_add_channel(struct app *app, const struct ast_channel *chan)
-{
- SCOPED_AO2LOCK(lock, app);
- const char *uniqueid;
-
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
-
- /* Don't accept new channels in an inactive application */
- if (!app->handler) {
- return -1;
- }
-
- uniqueid = ast_channel_uniqueid(chan);
- return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
-}
-
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
-{
- SCOPED_AO2LOCK(lock, app);
-
- ast_assert(app != NULL);
- ast_assert(chan != NULL);
-
- ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
-}
-
-int app_add_bridge(struct app *app, const char *uniqueid)
-{
- SCOPED_AO2LOCK(lock, app);
-
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
-
- /* Don't accept new bridges in an inactive application */
- if (!app->handler) {
- return -1;
- }
-
- return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
-}
-
-void app_remove_bridge(struct app* app, const char *uniqueid)
-{
- SCOPED_AO2LOCK(lock, app);
-
- ast_assert(app != NULL);
- ast_assert(uniqueid != NULL);
-
- ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE);
}
/*!
@@ -207,7 +200,7 @@
SCOPED_AO2LOCK(lock, app);
return app->handler == NULL &&
- ao2_container_count(app->channels) == 0;
+ ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
}
void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -243,16 +236,96 @@
return app->name;
}
-int app_is_watching_channel(struct app *app, const char *uniqueid)
-{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->channels, uniqueid, OBJ_KEY);
- return found != NULL;
-}
-
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
-{
- RAII_VAR(char *, found, NULL, ao2_cleanup);
- found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
- return found != NULL;
-}
+static void forwards_dtor(void *obj)
+{
+ struct app_forwards *forwards = obj;
+
+ stasis_unsubscribe(forwards->channel_forward);
+ forwards->channel_forward = NULL;
+ stasis_unsubscribe(forwards->channel_cached_forward);
+ forwards->channel_cached_forward = NULL;
+ stasis_unsubscribe(forwards->bridge_forward);
+ forwards->bridge_forward = NULL;
+ stasis_unsubscribe(forwards->bridge_cached_forward);
+ forwards->bridge_cached_forward = NULL;
+
+ ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
+
+ ao2_cleanup(forwards->app);
+ forwards->app = NULL;
+}
+
+struct app_forwards *app_subscribe_channel(struct app *app,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (!app || !chan) {
+ return NULL;
+ }
+
+ forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
+ if (!forwards) {
+ return NULL;
+ }
+
+ ao2_ref(app, +1);
+ forwards->app = app;
+
+ forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
+ if (!forwards->channel_forward) {
+ return NULL;
+ }
+
+ forwards->channel_cached_forward = stasis_forward_all(
+ ast_channel_topic_cached(chan), app->topic);
+ if (!forwards->channel_cached_forward) {
+ return NULL;
+ }
+
+ ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
+ ao2_ref(forwards, +1);
+ return forwards;
+}
+
+int app_subscribe_bridge(struct app_forwards *forwards,
+ struct ast_bridge *bridge)
+{
+ if (!forwards || !bridge) {
+ return -1;
+ }
+
+ ast_assert(forwards->app != NULL);
+
+ app_unsubscribe_bridge(forwards);
+
+ forwards->bridge_forward = stasis_forward_all(
+ ast_bridge_topic(bridge), forwards->app->topic);
+ if (!forwards->bridge_forward) {
+ return -1;
+ }
+
+ forwards->bridge_cached_forward = stasis_forward_all(
+ ast_bridge_topic_cached(bridge), forwards->app->topic);
+ if (!forwards->bridge_cached_forward) {
+ /* Probably a bad idea to stay half-subscribed */
+ stasis_unsubscribe(forwards->bridge_forward);
+ forwards->bridge_forward = NULL;
+ return -1;
+ }
+
+ return 0;
+}
+
+void app_unsubscribe_bridge(struct app_forwards *forwards)
+{
+ if (!forwards) {
+ return;
+ }
+
+ stasis_unsubscribe(forwards->bridge_forward);
+ forwards->bridge_forward = NULL;
+ stasis_unsubscribe(forwards->bridge_cached_forward);
+ forwards->bridge_cached_forward = NULL;
+}
Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Wed Aug 7 15:53:51 2013
@@ -96,17 +96,6 @@
const char *app_name(const struct app *app);
/*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
- struct stasis_topic *topic);
-
-/*!
* \brief Send a message to an application.
*
* \param app Application.
@@ -114,83 +103,41 @@
*/
void app_send(struct app *app, struct ast_json *message);
+struct app_forwards;
+
/*!
- * \brief Send the start message to an application.
+ * \brief Subscribes an application to a channel.
+ *
+ * The returned object is AO2 managed, and should be ao2_cleanup()'ed to kill
+ * the subscriptions.
*
* \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
+ * \param chan Channel to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
- char *argv[]);
+struct app_forwards *app_subscribe_channel(struct app *app,
+ struct ast_channel *chan);
/*!
- * \brief Send the end message to an application.
+ * \brief Add a bridge subscription to an existing channel subscription.
*
- * \param app Application.
- * \param chan The channel leaving the application.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_bridge(struct app_forwards *forwards,
+ struct ast_bridge *bridge);
/*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the bridge subscription for an application.
*
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_channel(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Add a channel to an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
* \return 0 on success.
* \return Non-zero on error.
*/
-int app_add_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Remove a channel from an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+void app_unsubscribe_bridge(struct app_forwards *forwards);
#endif /* _ASTERISK_RES_STASIS_APP_H */
More information about the svn-commits
mailing list