[svn-commits] mjordan: branch 12 r418089 - in /branches/12: include/asterisk/ res/ res/ari/...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Sun Jul 6 21:13:18 CDT 2014
Author: mjordan
Date: Sun Jul 6 21:13:13 2014
New Revision: 418089
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=418089
Log:
ARI/res_stasis: Subscribe to both Local channel halves when originating to app
This patch fixes two bugs:
1. When originating a channel into a Stasis application, we already create a
subscription for the channel that is going into our Stasis app.
Unfortunately, when you create a Local channel and pass it off to a Stasis
app, you really aren't creating just one channel: you're creating two. This
patch snags the second half of the Local channel pair (assuming it is a
Local channel pair, but luckily core_local is kind about such assumptions)
and subscribes to it as well.
2. Subscriptions are a bit sticky right now. If a subscription is made, the
'interest' count gets bumped on the Stasis subscription - but unless
something explicitly unsubscribes the channel, said subscription sticks
around. This is not much of a problem is a user is creating the subscription
- if they made it, they must want it. However, when we are creating
implicit subscriptions, we need to make sure something clears them out.
This patch takes a pessimistic approach: it watches the cache updates
coming from Stasis and, if we notice that the cache just cleared out an
object, we delete our subscription object. This keeps our ao2 container of
Stasis forwards in an application from growing out of hand; it also is a
bit more forgiving for end users who may not realize they were supposed to
unsubscribe from that channel that just hung up.
Review: https://reviewboard.asterisk.org/r/3710/
ASTERISK-23939 #close
Modified:
branches/12/include/asterisk/stasis_app.h
branches/12/res/ari/resource_channels.c
branches/12/res/res_stasis.c
branches/12/res/stasis/app.c
Modified: branches/12/include/asterisk/stasis_app.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis_app.h?view=diff&rev=418089&r1=418088&r2=418089
==============================================================================
--- branches/12/include/asterisk/stasis_app.h (original)
+++ branches/12/include/asterisk/stasis_app.h Sun Jul 6 21:13:13 2014
@@ -297,6 +297,21 @@
const char **event_source_uris, int event_sources_count,
struct ast_json **json);
+/*!
+ * \brief Directly subscribe an application to a channel
+ *
+ * \param app_name Name of the application to subscribe.
+ * \param chan The channel to subscribe to
+ *
+ * \return \ref stasis_app_subscribe_res return code.
+ *
+ * \note This method can be used when you already hold a channel and its
+ * lock. This bypasses the channel lookup that would normally be
+ * performed by \ref stasis_app_subscribe.
+ */
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan);
+
/*! @} */
/*! @{ */
Modified: branches/12/res/ari/resource_channels.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/ari/resource_channels.c?view=diff&rev=418089&r1=418088&r2=418089
==============================================================================
--- branches/12/res/ari/resource_channels.c (original)
+++ branches/12/res/ari/resource_channels.c Sun Jul 6 21:13:13 2014
@@ -42,6 +42,7 @@
#include "asterisk/stasis_app_snoop.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/causes.h"
+#include "asterisk/core_local.h"
#include "resource_channels.h"
#include <limits.h>
@@ -775,6 +776,7 @@
struct ast_format tmp_fmt;
char *stuff;
struct ast_channel *chan;
+ struct ast_channel *local_peer;
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_assigned_ids assignedids = {
.uniqueid = args_channel_id,
@@ -859,20 +861,24 @@
return;
}
+ /* See if this is a Local channel and if so, get the peer */
+ local_peer = ast_local_get_peer(chan);
+
+ if (!ast_strlen_zero(args_app)) {
+ stasis_app_subscribe_channel(args_app, chan);
+ if (local_peer) {
+ stasis_app_subscribe_channel(args_app, local_peer);
+ }
+ }
+
snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
ast_channel_unlock(chan);
- if (!ast_strlen_zero(args_app)) {
- /* channel: + channel ID + null terminator */
- char uri[9 + strlen(ast_channel_uniqueid(chan))];
- const char *uris[1] = { uri, };
-
- sprintf(uri, "channel:%s", ast_channel_uniqueid(chan));
- stasis_app_subscribe(args_app, uris, 1, NULL);
- }
-
ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL));
ast_channel_unref(chan);
+ if (local_peer) {
+ ast_channel_unref(local_peer);
+ }
}
void ast_ari_channels_originate_with_id(struct ast_variable *headers,
Modified: branches/12/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_stasis.c?view=diff&rev=418089&r1=418088&r2=418089
==============================================================================
--- branches/12/res/res_stasis.c (original)
+++ branches/12/res/res_stasis.c Sun Jul 6 21:13:13 2014
@@ -1225,6 +1225,29 @@
return STASIS_ASR_OK;
}
+enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
+ struct ast_channel *chan)
+{
+ RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
+ int res;
+
+ if (!app) {
+ return STASIS_ASR_APP_NOT_FOUND;
+ }
+
+ ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
+
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name, ast_channel_uniqueid(chan));
+ return STASIS_ASR_INTERNAL_ERROR;
+ }
+
+ return STASIS_ASR_OK;
+}
+
+
/*!
* \internal
* \brief Subscribe an app to an event source.
Modified: branches/12/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/stasis/app.c?view=diff&rev=418089&r1=418088&r2=418089
==============================================================================
--- branches/12/res/stasis/app.c (original)
+++ branches/12/res/stasis/app.c Sun Jul 6 21:13:13 2014
@@ -35,6 +35,8 @@
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
/*! Aggregation topic for this application. */
@@ -449,7 +451,7 @@
static channel_snapshot_monitor channel_monitors[] = {
channel_state,
channel_dialplan,
- channel_callerid
+ channel_callerid,
};
static void sub_channel_update_handler(void *data,
@@ -486,6 +488,10 @@
app_send(app, msg);
}
}
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+ }
}
static struct ast_json *simple_endpoint_event(
@@ -513,6 +519,7 @@
struct stasis_app *app = data;
struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot;
+ struct ast_endpoint_snapshot *old_snapshot;
const struct timeval *tv;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
@@ -522,17 +529,22 @@
ast_assert(update->type == ast_endpoint_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
- tv = update->new_snapshot ?
- stasis_message_timestamp(update->new_snapshot) :
- stasis_message_timestamp(message);
-
- json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
-
- if (!json) {
- return;
- }
-
- app_send(app, json);
+ old_snapshot = stasis_message_data(update->old_snapshot);
+
+ if (new_snapshot) {
+ tv = stasis_message_timestamp(update->new_snapshot);
+
+ json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
+ if (!json) {
+ return;
+ }
+
+ app_send(app, json);
+ }
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "endpoint", old_snapshot->id, 1);
+ }
}
static struct ast_json *simple_bridge_event(
@@ -580,11 +592,13 @@
json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
}
- if (!json) {
- return;
- }
-
- app_send(app, json);
+ if (json) {
+ app_send(app, json);
+ }
+
+ if (!new_snapshot && old_snapshot) {
+ unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+ }
}
@@ -982,7 +996,7 @@
return app_subscribe_channel(app, obj);
}
-static int unsubscribe(struct stasis_app *app, const char *kind, const char *id)
+static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
@@ -997,7 +1011,7 @@
forwards->interested--;
ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
- if (forwards->interested == 0) {
+ if (forwards->interested == 0 || terminate) {
/* No one is interested any more; unsubscribe */
ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
forwards_unsubscribe(forwards);
@@ -1024,7 +1038,7 @@
return -1;
}
- return unsubscribe(app, "channel", channel_id);
+ return unsubscribe(app, "channel", channel_id, 0);
}
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
@@ -1093,7 +1107,7 @@
return -1;
}
- return unsubscribe(app, "bridge", bridge_id);
+ return unsubscribe(app, "bridge", bridge_id, 0);
}
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
@@ -1153,7 +1167,7 @@
return -1;
}
- return unsubscribe(app, "endpoint", endpoint_id);
+ return unsubscribe(app, "endpoint", endpoint_id, 0);
}
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
More information about the svn-commits
mailing list