[asterisk-commits] ARI: Add the ability to subscribe to all events (asterisk[master])
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Sep 23 12:56:37 CDT 2015
Matt Jordan has submitted this change and it was merged.
Change subject: ARI: Add the ability to subscribe to all events
......................................................................
ARI: Add the ability to subscribe to all events
This patch adds the ability to subscribe to all events. There are two possible
ways to accomplish this:
(1) On initial WebSocket connection. This patch adds a new query parameter,
'subscribeAll'. If present and True, Asterisk will subscribe the
applications to all ARI events.
(2) Via the applications resource. When subscribing in this manner, an ARI
client should merely specify a blank resource name, i.e., 'channels:'
instead of 'channels:12354'. This will subscribe the application to all
resources of the 'channels' type.
ASTERISK-24870 #close
Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
---
M include/asterisk/stasis_app.h
M res/ari/resource_events.c
M res/ari/resource_events.h
M res/res_ari_events.c
M res/res_stasis.c
M res/stasis/app.c
M res/stasis/app.h
M res/stasis/messaging.c
M rest-api/api-docs/events.json
9 files changed, 361 insertions(+), 151 deletions(-)
Approvals:
Anonymous Coward #1000019: Verified
Matt Jordan: Looks good to me, approved
Joshua Colp: Looks good to me, but someone else must approve
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 567670b..f2b07e0 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -92,6 +92,21 @@
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
/*!
+ * \brief Register a new Stasis application that receives all Asterisk events.
+ *
+ * If an application is already registered with the given name, the old
+ * application is sent a 'replaced' message and unregistered.
+ *
+ * \param app_name Name of this application.
+ * \param handler Callback for application messages.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
+ *
+ * \return 0 for success
+ * \return -1 for error.
+ */
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
+
+/*!
* \brief Unregister a Stasis application.
* \param app_name Name of the application to unregister.
*/
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c
index deb7f9c..8fa15f5 100644
--- a/res/ari/resource_events.c
+++ b/res/ari/resource_events.c
@@ -280,7 +280,9 @@
}
event_session_shutdown(session);
- ao2_unlink(event_session_registry, session);
+ if (event_session_registry) {
+ ao2_unlink(event_session_registry, session);
+ }
}
/*!
@@ -367,6 +369,7 @@
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
+ int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
/* The request must have at least one [app] parameter */
@@ -399,6 +402,12 @@
}
/* Register the apps with Stasis */
+ if (args->subscribe_all) {
+ register_handler = &stasis_app_register_all;
+ } else {
+ register_handler = &stasis_app_register;
+ }
+
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
@@ -411,10 +420,10 @@
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
- if (stasis_app_register(app, stasis_app_message_handler, session)) {
+ if (register_handler(app, stasis_app_message_handler, session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
return event_session_allocation_error_handler(
- session, ERROR_TYPE_STASIS_REGISTRATION, ser);
+ session, ERROR_TYPE_STASIS_REGISTRATION, ser);
}
}
@@ -426,8 +435,17 @@
return 0;
}
+static int event_session_shutdown_cb(void *session, void *arg, int flags)
+{
+ event_session_cleanup(session);
+
+ return 0;
+}
+
void ast_ari_websocket_events_event_websocket_dtor(void)
{
+ ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL);
+
ao2_cleanup(event_session_registry);
event_session_registry = NULL;
}
@@ -462,7 +480,8 @@
struct ast_ari_websocket_session *ws_session, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args)
{
- RAII_VAR(struct event_session *, session, NULL, event_session_cleanup);
+ struct event_session *session;
+
struct ast_json *msg;
const char *session_id;
@@ -474,7 +493,6 @@
/* Find the event_session and update its websocket */
session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
-
if (session) {
ao2_unlink(event_session_registry, session);
event_session_update_websocket(session, ws_session);
@@ -487,6 +505,9 @@
while ((msg = ast_ari_websocket_session_read(ws_session))) {
ast_json_unref(msg);
}
+
+ event_session_cleanup(session);
+ ao2_ref(session, -1);
}
void ast_ari_events_user_event(struct ast_variable *headers,
diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h
index aa1e3df..8c03af4 100644
--- a/res/ari/resource_events.h
+++ b/res/ari/resource_events.h
@@ -47,6 +47,8 @@
size_t app_count;
/*! Parsing context for app. */
char *app_parse;
+ /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
+ int subscribe_all;
};
/*!
diff --git a/res/res_ari_events.c b/res/res_ari_events.c
index 4b2b151..e4fda0a 100644
--- a/res/res_ari_events.c
+++ b/res/res_ari_events.c
@@ -111,6 +111,9 @@
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
@@ -209,6 +212,9 @@
args.app[j] = (vals[j]);
}
} else
+ if (strcmp(i->name, "subscribeAll") == 0) {
+ args.subscribe_all = ast_true(i->value);
+ } else
{}
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index f7d8299..69e9b93 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -109,6 +109,11 @@
struct ao2_container *app_bridges_playback;
+/*!
+ * \internal \brief List of registered event sources.
+ */
+AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
+
static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
@@ -1469,7 +1474,7 @@
return ao2_bump(apps);
}
-int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@@ -1482,8 +1487,20 @@
if (app) {
app_update(app, handler, data);
} else {
- app = app_create(app_name, handler, data);
+ app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) {
+ if (all_events) {
+ struct stasis_app_event_source *source;
+ SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ AST_LIST_TRAVERSE(&event_sources, source, next) {
+ if (!source->subscribe) {
+ continue;
+ }
+
+ source->subscribe(app, NULL);
+ }
+ }
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} else {
ao2_unlock(apps_registry);
@@ -1497,6 +1514,16 @@
cleanup();
ao2_unlock(apps_registry);
return 0;
+}
+
+int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 0);
+}
+
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
+{
+ return __stasis_app_register(app_name, handler, data, 1);
}
void stasis_app_unregister(const char *app_name)
@@ -1525,11 +1552,6 @@
*/
cleanup();
}
-
-/*!
- * \internal \brief List of registered event sources.
- */
-AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{
@@ -1727,8 +1749,8 @@
ast_debug(3, "%s: Checking %s\n", app_name, uri);
- if (!event_source->find ||
- (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
+ if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
+ (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
@@ -2062,6 +2084,7 @@
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
+ .load_pri = AST_MODPRI_APP_DEPEND,
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,
diff --git a/res/stasis/app.c b/res/stasis/app.c
index b99e232..5002a0b 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -38,6 +38,10 @@
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@@ -47,12 +51,16 @@
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
+ /*! Optional router for handling endpoint messages in 'all' subscriptions */
+ struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
+ /*! Subscription model for the application */
+ enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@@ -121,34 +129,33 @@
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !chan) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_channel_uniqueid(chan));
+ forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
- forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (chan) {
+ forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_channel_topic_cached(chan), app->topic);
- if (!forwards->topic_cached_forward) {
+ chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
}
@@ -156,69 +163,100 @@
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
- if (!app || !bridge) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, bridge->uniqueid);
+ forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
- forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (bridge) {
+ forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+ app->topic);
}
-
forwards->topic_cached_forward = stasis_forward_all(
- ast_bridge_topic_cached(bridge), app->topic);
- if (!forwards->topic_cached_forward) {
+ bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+ app->topic);
+
+ if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
return NULL;
}
- ao2_ref(forwards, +1);
return forwards;
+}
+
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+ struct stasis_message *message)
+{
+ struct stasis_app *app = data;
+
+ stasis_publish(app->topic, message);
}
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+ struct app_forwards *forwards;
+ int ret = 0;
- if (!app || !endpoint) {
+ if (!app) {
return NULL;
}
- forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+ forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
- forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
- app->topic);
- if (!forwards->topic_forward) {
- return NULL;
+ if (endpoint) {
+ forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+ app->topic);
+ forwards->topic_cached_forward = stasis_forward_all(
+ ast_endpoint_topic_cached(endpoint), app->topic);
+
+ if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+ /* Half-subscribed is a bad thing */
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+ } else {
+ /* Since endpoint subscriptions also subscribe to channels, in the case
+ * of all endpoint subscriptions, we only want messages for the endpoints.
+ * As such, we route those particular messages and then re-publish them
+ * on the app's topic.
+ */
+ ast_assert(app->endpoint_router == NULL);
+ app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+ if (!app->endpoint_router) {
+ forwards_unsubscribe(forwards);
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
+
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_state_type(), endpoint_state_cb, app);
+ ret |= stasis_message_router_add(app->endpoint_router,
+ ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+ if (ret) {
+ ao2_ref(app->endpoint_router, -1);
+ app->endpoint_router = NULL;
+ ao2_ref(forwards, -1);
+ return NULL;
+ }
}
- forwards->topic_cached_forward = stasis_forward_all(
- ast_endpoint_topic_cached(endpoint), app->topic);
- if (!forwards->topic_cached_forward) {
- /* Half-subscribed is a bad thing */
- stasis_forward_cancel(forwards->topic_forward);
- forwards->topic_forward = NULL;
- return NULL;
- }
-
- ao2_ref(forwards, +1);
return forwards;
}
@@ -260,6 +298,7 @@
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
+ ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@@ -793,7 +832,7 @@
}
}
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@@ -806,10 +845,10 @@
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
if (!app) {
return NULL;
}
+ app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +916,8 @@
return app;
}
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
return app->topic;
}
@@ -930,6 +970,8 @@
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
+ stasis_message_router_unsubscribe(app->endpoint_router);
+ app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@@ -1029,34 +1071,47 @@
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
int res;
- if (!app || !chan) {
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_channel(app, chan);
- if (!forwards) {
- return -1;
- }
-
- res = ao2_link_flags(app->forwards, forwards,
- OBJ_NOLOCK);
- if (!res) {
- return -1;
- }
- }
-
- ++forwards->interested;
- ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
return 0;
}
+
+ forwards = ao2_find(app->forwards,
+ chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_channel(app, chan);
+ if (!forwards) {
+ return -1;
+ }
+
+ res = ao2_link_flags(app->forwards, forwards,
+ OBJ_NOLOCK);
+ if (!res) {
+ ao2_ref(forwards, -1);
+ return -1;
+ }
+ }
+
+ ++forwards->interested;
+ ast_debug(3, "Channel '%s' is %d interested in %s\n",
+ chan ? ast_channel_uniqueid(chan) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1068,6 +1123,19 @@
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!id) {
+ if (!strcmp(kind, "bridge")) {
+ id = BRIDGE_ALL;
+ } else if (!strcmp(kind, "channel")) {
+ id = CHANNEL_ALL;
+ } else if (!strcmp(kind, "endpoint")) {
+ id = ENDPOINT_ALL;
+ } else {
+ ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+ return -1;
+ }
+ }
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
@@ -1095,16 +1163,16 @@
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
- if (!app || !chan) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+ return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
- if (!app || !channel_id) {
+ if (!app) {
return -1;
}
@@ -1114,6 +1182,10 @@
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id)) {
+ channel_id = CHANNEL_ALL;
+ }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@@ -1133,28 +1205,39 @@
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, bridge->uniqueid,
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_bridge(app, bridge);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
- }
-
- ++forwards->interested;
- ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
return 0;
}
+
+ forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_bridge(app, bridge);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+ }
+
+ ++forwards->interested;
+ ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+ bridge ? bridge->uniqueid : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,16 +1247,16 @@
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
- if (!app || !bridge) {
+ if (!app) {
return -1;
}
- return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+ return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- if (!app || !bridge_id) {
+ if (!app) {
return -1;
}
@@ -1182,9 +1265,26 @@
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
- return forwards != NULL;
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ if (ast_strlen_zero(bridge_id)) {
+ bridge_id = BRIDGE_ALL;
+ }
+
+ forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
+ return 1;
+ }
+
+ return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1302,43 @@
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
- if (!app || !endpoint) {
+ struct app_forwards *forwards;
+ SCOPED_AO2LOCK(lock, app->forwards);
+
+ if (!app) {
return -1;
- } else {
- RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
- SCOPED_AO2LOCK(lock, app->forwards);
+ }
- forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
- OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
- if (!forwards) {
- /* Forwards not found, create one */
- forwards = forwards_create_endpoint(app, endpoint);
- if (!forwards) {
- return -1;
- }
- ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
- /* Subscribe for messages */
- messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
- }
-
- ++forwards->interested;
- ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
+ /* If subscribed to all, don't subscribe again */
+ forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (forwards) {
+ ao2_ref(forwards, -1);
return 0;
}
+
+ forwards = ao2_find(app->forwards,
+ endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+ OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (!forwards) {
+ /* Forwards not found, create one */
+ forwards = forwards_create_endpoint(app, endpoint);
+ if (!forwards) {
+ return -1;
+ }
+ ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+
+ /* Subscribe for messages */
+ messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+ }
+
+ ++forwards->interested;
+ ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+ endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+ forwards->interested,
+ app->name);
+
+ ao2_ref(forwards, -1);
+ return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,7 +1348,7 @@
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
- if (!app || !endpoint_id) {
+ if (!app) {
return -1;
}
@@ -1246,6 +1358,10 @@
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(endpoint_id)) {
+ endpoint_id = ENDPOINT_ALL;
+ }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 59574f5..2c8db1c 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -36,6 +36,19 @@
*/
struct stasis_app;
+enum stasis_app_subscription_model {
+ /*
+ * \brief An application must manually subscribe to each
+ * resource that it cares about. This is the default approach.
+ */
+ STASIS_APP_SUBSCRIBE_MANUAL,
+ /*
+ * \brief An application is automatically subscribed to all
+ * resources in Asterisk, even if it does not control them.
+ */
+ STASIS_APP_SUBSCRIBE_ALL
+};
+
/*!
* \brief Create a res_stasis application.
*
@@ -45,7 +58,7 @@
* \return New \c res_stasis application.
* \return \c NULL on error.
*/
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
/*!
* \brief Tears down an application.
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
index bf8aebb..16e167e 100644
--- a/res/stasis/messaging.c
+++ b/res/stasis/messaging.c
@@ -38,6 +38,11 @@
#include "messaging.h"
/*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
+/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
#define ENDPOINTS_NUM_BUCKETS 127
@@ -219,10 +224,14 @@
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
- || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ if (!sub) {
+ continue;
+ }
+
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
- sub = NULL; /* No ref bump! */
goto match;
}
@@ -231,6 +240,7 @@
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
+ ao2_ref(sub, -1);
goto match;
}
@@ -238,7 +248,6 @@
return 0;
match:
- ao2_cleanup(sub);
return 1;
}
@@ -301,7 +310,8 @@
continue;
}
- if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ if (!strcmp(sub->token, TECH_WILDCARD)
+ || !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
@@ -374,7 +384,7 @@
{
struct message_subscription *sub = NULL;
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
@@ -383,7 +393,7 @@
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
- if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
@@ -400,10 +410,6 @@
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
- if (!endpoint) {
- return;
- }
-
sub = get_subscription(endpoint);
if (!sub) {
return;
@@ -417,11 +423,11 @@
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
- AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
@@ -429,9 +435,9 @@
ao2_unlock(sub);
ao2_ref(sub, -1);
- ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@@ -442,12 +448,12 @@
return sub;
}
- sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
- if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
@@ -482,9 +488,9 @@
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
- ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
- app_name, ast_endpoint_get_id(endpoint));
+ app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}
diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json
index 54269a4..dee7c2d 100644
--- a/rest-api/api-docs/events.json
+++ b/rest-api/api-docs/events.json
@@ -26,6 +26,14 @@
"required": true,
"allowMultiple": true,
"dataType": "string"
+ },
+ {
+ "name": "subscribeAll",
+ "description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
+ "paramType": "query",
+ "required": false,
+ "allowMultiple": false,
+ "dataType": "boolean"
}
]
}
--
To view, visit https://gerrit.asterisk.org/1297
To unsubscribe, visit https://gerrit.asterisk.org/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Owner: Matt Jordan <mjordan at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
More information about the asterisk-commits
mailing list