[asterisk-commits] kmoore: branch kmoore/stasis-bridging-channel_events r384950 - in /team/kmoor...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Apr 8 11:12:26 CDT 2013
Author: kmoore
Date: Mon Apr 8 11:12:23 2013
New Revision: 384950
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384950
Log:
commit stasis-channel_events work up to this point
Modified:
team/kmoore/stasis-bridging-channel_events/apps/app_stasis.c
team/kmoore/stasis-bridging-channel_events/include/asterisk/strings.h
team/kmoore/stasis-bridging-channel_events/main/strings.c
Modified: team/kmoore/stasis-bridging-channel_events/apps/app_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridging-channel_events/apps/app_stasis.c?view=diff&rev=384950&r1=384949&r2=384950
==============================================================================
--- team/kmoore/stasis-bridging-channel_events/apps/app_stasis.c (original)
+++ team/kmoore/stasis-bridging-channel_events/apps/app_stasis.c Mon Apr 8 11:12:23 2013
@@ -37,7 +37,9 @@
#include "asterisk/channel.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
#include "asterisk/strings.h"
+#include "asterisk/callerid.h"
/*** DOCUMENTATION
<application name="Stasis" language="en_US">
@@ -61,6 +63,9 @@
/*! \brief Maximum number of arguments for the Stasis dialplan application */
#define MAX_ARGS 128
+/*! \brief Number of buckets for the channels container for app instances */
+#define APP_CHANNELS_BUCKETS 7
+
/*! \brief Dialplan application name */
static const char *stasis = "Stasis";
@@ -83,6 +88,8 @@
struct ao2_container *__apps_registry;
struct ao2_container *__app_controls;
+
+struct stasis_message_router *app_channel_router;
/*! Ref-counting accessor for the stasis applications container */
static struct ao2_container *apps_registry(void)
@@ -102,6 +109,8 @@
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! List of channel identifiers this app instance is interested in */
+ struct ao2_container *channels;
/*! Name of the Stasis application */
char name[];
};
@@ -112,12 +121,14 @@
ao2_cleanup(app->data);
app->data = NULL;
+ ao2_cleanup(app->channels);
+ app->channels = NULL;
}
/*! Constructor for \ref app. */
static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
- struct app *app;
+ RAII_VAR(struct app *, app, NULL, ao2_cleanup);
size_t size;
ast_assert(name != NULL);
@@ -135,6 +146,12 @@
ao2_ref(data, +1);
app->data = data;
+ app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
+ if (!app->channels) {
+ return NULL;
+ }
+
+ ao2_ref(app, +1);
return app;
}
@@ -159,6 +176,34 @@
} else {
return 0;
}
+}
+
+static int app_add_channel(struct app* app, const struct ast_channel *chan)
+{
+ RAII_VAR(char *, ao2_channel_id, NULL, ao2_cleanup);
+ const char *uniqueid;
+ ast_assert(chan != NULL);
+ ast_assert(app != NULL);
+
+ uniqueid = ast_channel_uniqueid(chan);
+
+ ao2_channel_id = ao2_alloc(strlen(uniqueid) + 1, NULL);
+ if (!ao2_channel_id) {
+ return -1;
+ }
+
+ /* safe strcpy */
+ strcpy(ao2_channel_id, uniqueid);
+ ao2_link(app->channels, ao2_channel_id);
+ return 0;
+}
+
+static void app_remove_channel(struct app* app, const struct ast_channel *chan)
+{
+ ast_assert(chan != NULL);
+ ast_assert(app != NULL);
+
+ ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK);
}
/*!
@@ -331,25 +376,249 @@
return 0;
}
-static void sub_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
- struct stasis_message *message)
-{
- struct app *app = data;
- if (ast_channel_snapshot_type() == stasis_message_type(message)) {
+static int app_watching_channel_cb(void *obj, void *arg, int flags)
+{
+ RAII_VAR(char *, uniqueid, NULL, ao2_cleanup);
+ struct app *app = obj;
+ struct ast_channel_snapshot *snapshot = arg;
+
+ uniqueid = ao2_find(app->channels, snapshot->uniqueid, OBJ_KEY);
+ return uniqueid ? CMP_MATCH : 0;
+}
+
+static struct ao2_container *get_watching_apps(struct ast_channel_snapshot *snapshot)
+{
+ RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
+ struct ao2_container *watching_apps;
+ RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy);
+ ast_assert(snapshot != NULL);
+ ast_assert(apps != NULL);
+
+ watching_apps_iter = ao2_callback(apps, OBJ_MULTIPLE, app_watching_channel_cb, snapshot);
+ watching_apps = watching_apps_iter->c;
+
+ if (!ao2_container_count(watching_apps)) {
+ return NULL;
+ }
+
+ ao2_ref(watching_apps, +1);
+ return watching_apps_iter->c;
+}
+
+/*! \brief Typedef for callbacks that get called on channel snapshot updates */
+typedef struct ast_json *(*snapshot_monitor)(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot);
+
+/*! \brief Handle channel state changes */
+static struct ast_json *channel_state(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+ char *event_name = NULL;
+
+ if (!old_snapshot) {
+ event_name = "channel-event-create";
+ } else if (!new_snapshot) {
+ event_name = "channel-event-destroy";
+ json = ast_json_pack("{s: i, s: s}",
+ "Cause", old_snapshot->hangupcause,
+ "Cause-txt", ast_cause2str(old_snapshot->hangupcause));
+ if (!json) {
+ return NULL;
+ }
+ } else if (old_snapshot->state != new_snapshot->state) {
+ event_name = "channel-event-state";
+ }
+
+ if (!event_name) {
+ return NULL;
+ }
+
+ return app_event_create(event_name, new_snapshot ? new_snapshot : old_snapshot, json);
+}
+
+/*!
+ * \brief Compares the context, exten and priority of two snapshots.
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ * \return True (non-zero) if context, exten or priority are identical.
+ * \return False (zero) if context, exten and priority changed.
+ */
+static inline int cep_equal(
+ const struct ast_channel_snapshot *old_snapshot,
+ const struct ast_channel_snapshot *new_snapshot)
+{
+ ast_assert(old_snapshot != NULL);
+ ast_assert(new_snapshot != NULL);
+
+ /* We actually get some snapshots with CEP set, but before the
+ * application is set. Since empty application is invalid, we treat
+ * setting the application from nothing as a CEP change.
+ */
+ if (ast_strlen_zero(old_snapshot->appl) &&
+ !ast_strlen_zero(new_snapshot->appl)) {
+ return 0;
+ }
+
+ return old_snapshot->priority == new_snapshot->priority &&
+ strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
+ strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
+}
+
+static struct ast_json *channel_dialplan(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ /* No Newexten event on cache clear */
+ if (!new_snapshot) {
+ return NULL;
+ }
+
+ /* Empty application is not valid for a Newexten event */
+ if (ast_strlen_zero(new_snapshot->appl)) {
+ return NULL;
+ }
+
+ if (old_snapshot && cep_equal(old_snapshot, new_snapshot)) {
+ return NULL;
+ }
+
+ json = ast_json_pack("{s: s, s: s}",
+ "Application", new_snapshot->appl,
+ "ApplicationData", new_snapshot->data);
+ if (!json) {
+ return NULL;
+ }
+
+ return app_event_create("channel-event-dialplan", new_snapshot, json);
+}
+
+/*!
+ * \brief Compares the callerid info of two snapshots.
+ * \param old_snapshot Old snapshot
+ * \param new_snapshot New snapshot
+ * \return True (non-zero) if callerid are identical.
+ * \return False (zero) if callerid changed.
+ */
+static inline int caller_id_equal(
+ const struct ast_channel_snapshot *old_snapshot,
+ const struct ast_channel_snapshot *new_snapshot)
+{
+ ast_assert(old_snapshot != NULL);
+ ast_assert(new_snapshot != NULL);
+ return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
+ strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
+}
+
+static struct ast_json *channel_callerid(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot)
+{
+ RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+ /* No NewCallerid event on cache clear or first event */
+ if (!old_snapshot || !new_snapshot) {
+ return NULL;
+ }
+
+ if (caller_id_equal(old_snapshot, new_snapshot)) {
+ return NULL;
+ }
+
+ json = ast_json_pack("{s: i, s: s}",
+ "CallerPresentation", new_snapshot->caller_pres,
+ "CallerPresentation-txt", ast_describe_caller_presentation(new_snapshot->caller_pres));
+ if (!json) {
+ return NULL;
+ }
+
+ return app_event_create("channel-event-callerid", new_snapshot, json);
+}
+
+static struct ast_json *channel_snapshot(
+ struct ast_channel_snapshot *old_snapshot,
+ struct ast_channel_snapshot *new_snapshot)
+{
+ if (!new_snapshot) {
+ return NULL;
+ }
+
+ return app_event_create("channel-snapshot", new_snapshot, NULL);
+}
+
+snapshot_monitor monitors[] = {
+ channel_snapshot,
+ channel_state,
+ channel_dialplan,
+ channel_callerid
+};
+
+static int app_send_cb(void *obj, void *arg, int flags)
+{
+ struct app *app = obj;
+ struct ast_json *msg = arg;
+
+ app_send(app, msg);
+ return 0;
+}
+
+static void sub_snapshot_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ struct stasis_cache_update *update = stasis_message_data(message);
+ struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot);
+ struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot);
+ int i;
+
+ watching_apps = get_watching_apps(new_snapshot ? new_snapshot : old_snapshot);
+ if (!watching_apps) {
+ return;
+ }
+
+ for (i = 0; i < ARRAY_LEN(monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
- struct ast_channel_snapshot *snapshot =
- stasis_message_data(message);
-
- msg = app_event_create("channel-state-change", snapshot, NULL);
- if (!msg) {
- return;
+
+ msg = monitors[i](old_snapshot, new_snapshot);
+ if (msg) {
+ ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
- app_send(app, msg);
- }
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(data);
- }
+ }
+}
+
+static void sub_blob_handler(void *data,
+ struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+ RAII_VAR(struct ast_str *, event_name, ast_str_create(32), ast_free);
+ RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
+ struct ast_channel_blob *obj = stasis_message_data(message);
+
+ if (!obj->snapshot) {
+ return;
+ }
+
+ watching_apps = get_watching_apps(obj->snapshot);
+ if (!watching_apps) {
+ return;
+ }
+
+ ast_str_set(&event_name, 0, "channel-event-%s", ast_channel_blob_json_type(obj));
+
+ msg = app_event_create(ast_str_buffer(event_name), obj->snapshot, obj->blob);
+ if (!msg) {
+ return;
+ }
+
+ ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg);
}
/*!
@@ -375,7 +644,6 @@
RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
RAII_VAR(struct app *, app, NULL, ao2_cleanup);
RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, stasis_unsubscribe);
int res = 0;
char *parse = NULL;
int hungup = 0;
@@ -415,17 +683,15 @@
ao2_link(controls, control);
}
- subscription = stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
- if (subscription == NULL) {
- ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n", args.app_name, ast_channel_name(chan));
- return -1;
- }
- ao2_ref(app, +1); /* subscription now has a reference */
-
res = send_start_msg(app, chan, args.argc - 1, args.app_argv);
if (res != 0) {
ast_log(LOG_ERROR, "Error sending start message to %s\n", args.app_name);
return res;
+ }
+
+ if (app_add_channel(app, chan)) {
+ ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), args.app_name);
+ return -1;
}
while (!hungup && !control_continue_test_and_reset(control) && ast_waitfor(chan, -1) > -1) {
@@ -448,6 +714,7 @@
}
}
+ app_remove_channel(app, chan);
res = send_end_msg(app, chan);
if (res != 0) {
ast_log(LOG_ERROR, "Error sending end message to %s\n", args.app_name);
@@ -532,6 +799,14 @@
return AST_MODULE_LOAD_FAILURE;
}
+ app_channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ if (!app_channel_router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ r |= stasis_message_router_add(app_channel_router, stasis_cache_update_type(), sub_snapshot_handler, NULL);
+ r |= stasis_message_router_add(app_channel_router, ast_channel_blob_type(), sub_blob_handler, NULL);
+
r |= ast_register_application_xml(stasis, app_stasis_exec);
return r;
}
@@ -539,6 +814,9 @@
static int unload_module(void)
{
int r = 0;
+
+ stasis_message_router_unsubscribe(app_channel_router);
+ app_channel_router = NULL;
ao2_cleanup(__apps_registry);
__apps_registry = NULL;
Modified: team/kmoore/stasis-bridging-channel_events/include/asterisk/strings.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridging-channel_events/include/asterisk/strings.h?view=diff&rev=384950&r1=384949&r2=384950
==============================================================================
--- team/kmoore/stasis-bridging-channel_events/include/asterisk/strings.h (original)
+++ team/kmoore/stasis-bridging-channel_events/include/asterisk/strings.h Mon Apr 8 11:12:23 2013
@@ -1013,4 +1013,12 @@
return abs(hash);
}
+/*!
+ * \since 12
+ * \brief Allocates a hash container for bare strings
+ * \retval AO2 container for strings
+ * \retval NULL if allocation failed
+ */
+struct ao2_container *ast_str_container_alloc(int buckets);
+
#endif /* _ASTERISK_STRINGS_H */
Modified: team/kmoore/stasis-bridging-channel_events/main/strings.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-bridging-channel_events/main/strings.c?view=diff&rev=384950&r1=384949&r2=384950
==============================================================================
--- team/kmoore/stasis-bridging-channel_events/main/strings.c (original)
+++ team/kmoore/stasis-bridging-channel_events/main/strings.c Mon Apr 8 11:12:23 2013
@@ -160,3 +160,17 @@
return (*buf)->__AST_STR_STR;
}
+static int str_hash(const void *obj, const int flags)
+{
+ return ast_str_hash(obj);
+}
+
+static int str_cmp(void *lhs, void *rhs, int flags)
+{
+ return strcmp(lhs, rhs) ? 0 : CMP_MATCH;
+}
+
+struct ao2_container *ast_str_container_alloc(int buckets)
+{
+ return ao2_container_alloc(buckets, str_hash, str_cmp);
+}
More information about the asterisk-commits
mailing list