[asterisk-commits] kmoore: branch 12 r427788 - in /branches/12: include/asterisk/ res/ res/stasis/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Nov 13 09:42:42 CST 2014
Author: kmoore
Date: Thu Nov 13 09:42:28 2014
New Revision: 427788
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=427788
Log:
Stasis: Fix StasisEnd message ordering
This change corrects message ordering in cases where a channel-related
message can be received after a Stasis/ARI application has received the
StasisEnd message. The StasisEnd message was being passed to
applications directly without waiting for the channel topic to empty.
As a result of this fix, other bugs were also identified and fixed:
* StasisStart messages were also being sent directly to apps and are
now routed through the stasis message bus properly
* Masquerade monitor datastores were being removed at the incorrect
time in some cases and were causing StasisEnd messages to not be sent
* General refactoring where necessary for the above
* Unsubscription on StasisEnd timing changes to prevent additional
messages from following the StasisEnd when they shouldn't
A channel sanitization function pointer was added to reduce processing
and AO2 lookups.
Review: https://reviewboard.asterisk.org/r/4163/
ASTERISK-24501 #close
Reported by: Matt Jordan
Modified:
branches/12/include/asterisk/stasis.h
branches/12/include/asterisk/stasis_app.h
branches/12/res/res_stasis.c
branches/12/res/stasis/app.c
branches/12/res/stasis/app.h
branches/12/res/stasis/stasis_bridge.c
Modified: branches/12/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis.h?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/include/asterisk/stasis.h (original)
+++ branches/12/include/asterisk/stasis.h Thu Nov 13 09:42:28 2014
@@ -221,6 +221,17 @@
* \retval zero if the channel should remain in the message
*/
int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
+
+ /*!
+ * \brief Callback which determines whether a channel should be sanitized from
+ * a message based on the channel
+ *
+ * \param chan The channel to be checked
+ *
+ * \retval non-zero if the channel should be left out of the message
+ * \retval zero if the channel should remain in the message
+ */
+ int (*channel)(const struct ast_channel *chan);
};
/*!
Modified: branches/12/include/asterisk/stasis_app.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis_app.h?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/include/asterisk/stasis_app.h (original)
+++ branches/12/include/asterisk/stasis_app.h Thu Nov 13 09:42:28 2014
@@ -800,11 +800,6 @@
struct stasis_message_sanitizer *stasis_app_get_sanitizer(void);
/*!
- * \brief Stasis message type for a StasisEnd event
- */
-struct stasis_message_type *ast_stasis_end_message_type(void);
-
-/*!
* \brief Indicate that this channel has had a StasisEnd published for it
*
* \param The channel that is exiting Stasis.
Modified: branches/12/res/res_stasis.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/res_stasis.c?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/res/res_stasis.c (original)
+++ branches/12/res/res_stasis.c Thu Nov 13 09:42:28 2014
@@ -108,30 +108,68 @@
struct ao2_container *app_bridges_playback;
-static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot,
+static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
+ struct ast_channel_blob *payload = stasis_message_data(message);
+
+ if (sanitize && sanitize->channel_snapshot &&
+ sanitize->channel_snapshot(payload->snapshot)) {
+ return NULL;
+ }
+
return ast_json_pack("{s: s, s: o, s: o}",
"type", "StasisEnd",
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
- "channel", ast_channel_snapshot_to_json(snapshot, sanitize));
-}
-
-static struct ast_json *stasis_end_to_json(struct stasis_message *message,
+ "channel", ast_channel_snapshot_to_json(payload->snapshot, sanitize));
+}
+
+STASIS_MESSAGE_TYPE_DEFN(app_end_message_type,
+ .to_json = stasis_end_to_json);
+
+struct start_message_blob {
+ struct ast_channel_snapshot *channel; /*!< Channel that is entering Stasis() */
+ struct ast_channel_snapshot *replace_channel; /*!< Channel that is being replaced (optional) */
+ struct ast_json *blob; /*!< JSON blob containing timestamp and args */
+};
+
+static struct ast_json *stasis_start_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
- struct ast_channel_blob *payload = stasis_message_data(message);
+ struct start_message_blob *payload = stasis_message_data(message);
+ struct ast_json *msg;
if (sanitize && sanitize->channel_snapshot &&
- sanitize->channel_snapshot(payload->snapshot)) {
- return NULL;
- }
-
- return stasis_end_json_payload(payload->snapshot, sanitize);
-}
-
-STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type,
- .to_json = stasis_end_to_json);
+ sanitize->channel_snapshot(payload->channel)) {
+ return NULL;
+ }
+
+ msg = ast_json_pack("{s: s, s: O, s: O, s: o}",
+ "type", "StasisStart",
+ "timestamp", ast_json_object_get(payload->blob, "timestamp"),
+ "args", ast_json_object_get(payload->blob, "args"),
+ "channel", ast_channel_snapshot_to_json(payload->channel, NULL));
+ if (!msg) {
+ ast_log(LOG_ERROR, "Failed to pack JSON for StasisStart message\n");
+ return NULL;
+ }
+
+ if (payload->replace_channel) {
+ int res = ast_json_object_set(msg, "replace_channel",
+ ast_channel_snapshot_to_json(payload->replace_channel, NULL));
+
+ if (res) {
+ ast_json_unref(msg);
+ ast_log(LOG_ERROR, "Failed to append JSON for StasisStart message\n");
+ return NULL;
+ }
+ }
+
+ return msg;
+}
+
+STASIS_MESSAGE_TYPE_DEFN_LOCAL(start_message_type,
+ .to_json = stasis_start_to_json);
const char *stasis_app_name(const struct stasis_app *app)
{
@@ -862,51 +900,64 @@
return replace_channel_app;
}
-static int send_start_msg_snapshots(struct stasis_app *app,
+static void start_message_blob_dtor(void *obj)
+{
+ struct start_message_blob *payload = obj;
+
+ ao2_cleanup(payload->channel);
+ ao2_cleanup(payload->replace_channel);
+ ast_json_unref(payload->blob);
+}
+
+static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app *app,
int argc, char *argv[], struct ast_channel_snapshot *snapshot,
struct ast_channel_snapshot *replace_channel_snapshot)
{
- RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref);
struct ast_json *json_args;
- struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
+ RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup);
+ struct stasis_message *msg;
int i;
- if (sanitize && sanitize->channel_snapshot
- && sanitize->channel_snapshot(snapshot)) {
- return 0;
- }
-
- msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
- "type", "StasisStart",
+ payload = ao2_alloc(sizeof(*payload), start_message_blob_dtor);
+ if (!payload) {
+ ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
+ return -1;
+ }
+
+ payload->channel = ao2_bump(snapshot);
+ payload->replace_channel = ao2_bump(replace_channel_snapshot);
+
+ json_blob = ast_json_pack("{s: o, s: []}",
"timestamp", ast_json_timeval(ast_tvnow(), NULL),
- "args",
- "channel", ast_channel_snapshot_to_json(snapshot, NULL));
- if (!msg) {
- return -1;
- }
-
- if (replace_channel_snapshot) {
- int res = ast_json_object_set(msg, "replace_channel",
- ast_channel_snapshot_to_json(replace_channel_snapshot, NULL));
-
- if (res) {
- return -1;
- }
+ "args");
+ if (!json_blob) {
+ ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
+ return -1;
}
/* Append arguments to args array */
- json_args = ast_json_object_get(msg, "args");
+ json_args = ast_json_object_get(json_blob, "args");
ast_assert(json_args != NULL);
for (i = 0; i < argc; ++i) {
int r = ast_json_array_append(json_args,
ast_json_string_create(argv[i]));
if (r != 0) {
- ast_log(LOG_ERROR, "Error appending start message\n");
+ ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
return -1;
}
}
- app_send(app, msg);
+ payload->blob = ast_json_ref(json_blob);
+
+ msg = stasis_message_create(start_message_type(), payload);
+ if (!msg) {
+ ast_log(LOG_ERROR, "Error sending StasisStart message\n");
+ return -1;
+ }
+
+ stasis_publish(ast_channel_topic(chan), msg);
+ ao2_ref(msg, -1);
return 0;
}
@@ -928,30 +979,35 @@
if (!snapshot) {
return -1;
}
- return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot);
-}
-
-static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot)
+ return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
+}
+
+static void remove_masquerade_store(struct ast_channel *chan);
+
+int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
{
struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
- struct ast_json *msg;
-
- if (sanitize && sanitize->channel_snapshot
- && sanitize->channel_snapshot(snapshot)) {
+ struct ast_json *blob;
+
+ if (sanitize && sanitize->channel
+ && sanitize->channel(chan)) {
return 0;
}
- msg = stasis_end_json_payload(snapshot, sanitize);
- if (!msg) {
- return -1;
- }
-
- app_send(app, msg);
- ast_json_unref(msg);
+ blob = ast_json_pack("{s: s}", "app", app_name(app));
+ if (!blob) {
+ ast_log(LOG_ERROR, "Error packing JSON for StasisEnd message\n");
+ return -1;
+ }
+
+ stasis_app_channel_set_stasis_end_published(chan);
+ remove_masquerade_store(chan);
+ ast_channel_publish_blob(chan, app_end_message_type(), blob);
+
+ ast_json_unref(blob);
+
return 0;
}
-
-static void remove_masquerade_store(struct ast_channel *chan);
static int masq_match_cb(void *obj, void *data, int flags)
{
@@ -968,32 +1024,22 @@
static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
{
- struct ast_channel_snapshot *snapshot;
struct stasis_app_control *control;
-
- /* grab a snapshot */
- snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
- if (!snapshot) {
- ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
- return;
- }
/* find control */
control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
if (!control) {
ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
- ao2_cleanup(snapshot);
return;
}
/* send the StasisEnd message to the app */
- send_end_msg_snapshot(control_app(control), snapshot);
+ app_send_end_msg(control_app(control), new_chan);
/* remove the datastore */
remove_masquerade_store(old_chan);
ao2_cleanup(control);
- ao2_cleanup(snapshot);
}
static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
@@ -1032,10 +1078,10 @@
/* send the StasisStart with replace_channel to the app */
- send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot,
+ send_start_msg_snapshots(new_chan, control_app(control), 0, NULL, new_snapshot,
old_snapshot);
/* send the StasisEnd message to the app */
- send_end_msg_snapshot(control_app(control), old_snapshot);
+ app_send_end_msg(control_app(control), old_chan);
/* fixup channel topic forwards */
if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
@@ -1090,33 +1136,6 @@
ast_datastore_free(datastore);
}
-static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
-{
- struct ast_channel_snapshot *snapshot;
- int res = 0;
-
- ast_assert(chan != NULL);
-
- /* A masquerade has occurred and this message will be wrong so it
- * has already been sent elsewhere. */
- if (!has_masquerade_store(chan)) {
- return 0;
- }
-
- /* Set channel info */
- snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
- if (!snapshot) {
- return -1;
- }
-
- if (send_end_msg_snapshot(app, snapshot)) {
- res = -1;
- }
-
- ao2_cleanup(snapshot);
- return res;
-}
-
void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
{
while (!control_is_done(control)) {
@@ -1232,18 +1251,18 @@
return -1;
}
+ res = app_subscribe_channel(app, chan);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
+ app_name, ast_channel_name(chan));
+ remove_masquerade_store(chan);
+ return -1;
+ }
+
res = send_start_msg(app, chan, argc, argv);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending start message to '%s'\n", app_name);
- remove_masquerade_store(chan);
- return -1;
- }
-
- res = app_subscribe_channel(app, chan);
- if (res != 0) {
- ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
- app_name, ast_channel_name(chan));
remove_masquerade_store(chan);
return -1;
}
@@ -1327,9 +1346,9 @@
/* Only publish a stasis_end event if it hasn't already been published */
if (!stasis_app_channel_is_stasis_end_published(chan)) {
- app_unsubscribe_channel(app, chan);
- res = send_end_msg(app, chan);
- remove_masquerade_store(chan);
+ /* A masquerade has occurred and this message will be wrong so it
+ * has already been sent elsewhere. */
+ res = has_masquerade_store(chan) && app_send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR,
"Error sending end message to %s\n", app_name);
@@ -1849,15 +1868,8 @@
ast_module_unref(ast_module_info->self);
}
-/*!
- * \brief Subscription to StasisEnd events
- */
-struct stasis_subscription *stasis_end_sub;
-
static int unload_module(void)
{
- stasis_end_sub = stasis_unsubscribe(stasis_end_sub);
-
stasis_app_unregister_event_sources();
messaging_cleanup();
@@ -1878,7 +1890,8 @@
ao2_cleanup(app_bridges_playback);
app_bridges_playback = NULL;
- STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(app_end_message_type);
+ STASIS_MESSAGE_TYPE_CLEANUP(start_message_type);
return 0;
}
@@ -1887,6 +1900,15 @@
static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
{
if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
+ return 0;
+ }
+ return 1;
+}
+
+/* \brief Sanitization callback for channels */
+static int channel_sanitizer(const struct ast_channel *chan)
+{
+ if (!chan || !(ast_channel_tech(chan)->properties & AST_CHAN_TP_INTERNAL)) {
return 0;
}
return 1;
@@ -1904,6 +1926,7 @@
struct stasis_message_sanitizer app_sanitizer = {
.channel_id = channel_id_sanitizer,
.channel_snapshot = channel_snapshot_sanitizer,
+ .channel = channel_sanitizer,
};
struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
@@ -1911,21 +1934,7 @@
return &app_sanitizer;
}
-static void remove_masquerade_store_by_name(const char *channel_name)
-{
- struct ast_channel *chan;
-
- chan = ast_channel_get_by_name(channel_name);
- if (!chan) {
- return;
- }
-
- remove_masquerade_store(chan);
- ast_channel_unref(chan);
-}
-
-static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
- struct stasis_message *message)
+void app_end_message_handler(struct stasis_message *message)
{
struct ast_channel_blob *payload;
struct ast_channel_snapshot *snapshot;
@@ -1934,10 +1943,6 @@
size_t alloc_size;
const char *channels[1];
- if (stasis_message_type(message) != ast_stasis_end_message_type()) {
- return;
- }
-
payload = stasis_message_data(message);
snapshot = payload->snapshot;
app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
@@ -1949,8 +1954,6 @@
channels[0] = channel_uri;
stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
-
- remove_masquerade_store_by_name(snapshot->name);
}
static const struct ast_datastore_info stasis_internal_channel_info = {
@@ -2023,7 +2026,10 @@
static int load_module(void)
{
- if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) {
+ if (STASIS_MESSAGE_TYPE_INIT(start_message_type) != 0) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+ if (STASIS_MESSAGE_TYPE_INIT(app_end_message_type) != 0) {
return AST_MODULE_LOAD_DECLINE;
}
apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
@@ -2049,12 +2055,6 @@
stasis_app_register_event_sources();
- stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL);
- if (!stasis_end_sub) {
- unload_module();
- return AST_MODULE_LOAD_DECLINE;
- }
-
return AST_MODULE_LOAD_SUCCESS;
}
Modified: branches/12/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/stasis/app.c?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/res/stasis/app.c (original)
+++ branches/12/res/stasis/app.c Thu Nov 13 09:42:28 2014
@@ -300,6 +300,10 @@
if (stasis_message_type(message) == ast_channel_dial_type()) {
call_forwarded_handler(app, message);
+ }
+
+ if (stasis_message_type(message) == app_end_message_type()) {
+ app_end_message_handler(message);
}
/* By default, send any message that has a JSON representation */
Modified: branches/12/res/stasis/app.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/stasis/app.h?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/res/stasis/app.h (original)
+++ branches/12/res/stasis/app.h Thu Nov 13 09:42:28 2014
@@ -270,4 +270,27 @@
*/
int app_replace_channel_forwards(struct stasis_app *app, const char *old_id, struct ast_channel *new_chan);
+/*!
+ * \brief Send StasisEnd message to the listening app
+ *
+ * \param app The app that owns the channel
+ * \param chan The channel for which the message is being sent
+ *
+ * \retval zero on success
+ * \return non-zero on failure
+ */
+int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan);
+
+/*!
+ * \brief Handle cleanup related to StasisEnd messages
+ *
+ * \param message The message for which to clean up
+ */
+void app_end_message_handler(struct stasis_message *message);
+
+/*!
+ * \brief Accessor for the StasisEnd message type
+ */
+struct stasis_message_type *app_end_message_type(void);
+
#endif /* _ASTERISK_RES_STASIS_APP_H */
Modified: branches/12/res/stasis/stasis_bridge.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/res/stasis/stasis_bridge.c?view=diff&rev=427788&r1=427787&r2=427788
==============================================================================
--- branches/12/res/stasis/stasis_bridge.c (original)
+++ branches/12/res/stasis/stasis_bridge.c Thu Nov 13 09:42:28 2014
@@ -164,7 +164,6 @@
{
if (src->v_table == &bridge_stasis_v_table &&
dst->v_table != &bridge_stasis_v_table) {
- RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
struct ast_channel *chan;
@@ -176,11 +175,7 @@
return -1;
}
- blob = ast_json_pack("{s: s}", "app", app_name(control_app(control)));
-
- stasis_app_channel_set_stasis_end_published(chan);
-
- ast_channel_publish_blob(chan, ast_stasis_end_message_type(), blob);
+ app_send_end_msg(control_app(control), chan);
}
return -1;
More information about the asterisk-commits
mailing list