[Asterisk-code-review] ARI: Add the ability to subscribe to all events (asterisk[13])

Matt Jordan asteriskteam at digium.com
Tue Sep 8 20:13:54 CDT 2015


Matt Jordan has uploaded a new change for review.

  https://gerrit.asterisk.org/1193

Change subject: ARI: Add the ability to subscribe to all events
......................................................................

ARI: Add the ability to subscribe to all events

This patch adds the ability to subscribe to all events. There are two possible
ways to accomplish this:
(1) On initial WebSocket connection. This patch adds a new query parameter,
    'subscribeAll'. If present and True, Asterisk will subscribe the
    applications to all ARI events.
(2) Via the applications resource. When subscribing in this manner, an ARI
    client should merely specify a blank resource name, i.e., 'channels:'
    instead of 'channels:12354'. This will subscribe the application to all
    resources of the 'channels' type.

ASTERISK-24870 #close

Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
---
M CHANGES
M include/asterisk/stasis_app.h
M res/ari/resource_events.c
M res/ari/resource_events.h
M res/res_ari_events.c
M res/res_stasis.c
M res/stasis/app.c
M res/stasis/app.h
M res/stasis/messaging.c
M rest-api/api-docs/events.json
10 files changed, 374 insertions(+), 153 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/93/1193/2

diff --git a/CHANGES b/CHANGES
index c249195..0c467a8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,21 @@
    return the SIP Call-ID associated with the INVITE request that established
    the PJSIP channel.
 
+ARI
+------------------
+ * Added the ability to subscribe to all ARI events in Asterisk, regardless
+   of whether the application 'controls' the resource. This is useful for
+   scenarios where an ARI application merely wants to observe the system,
+   as opposed to control it. There are two ways to accomplish this:
+   (1) Via the WebSocket connection URI. A new query paramter, 'subscribeAll',
+       has been added that, when present and True, will subscribe all
+       specified applications to all ARI event sources in Asterisk.
+   (2) Via the applications resource. An ARI client can, at any time, subscribe
+       to all resources in an event source merely by not providing an explicit
+       resource. For example, subscribing to an event source of 'channels:'
+       as opposed to 'channels:12345' will subscribe the application to all
+       channels.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.4.0 to Asterisk 13.5.0 ------------
 ------------------------------------------------------------------------------
diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h
index 567670b..f2b07e0 100644
--- a/include/asterisk/stasis_app.h
+++ b/include/asterisk/stasis_app.h
@@ -92,6 +92,21 @@
 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
 
 /*!
+ * \brief Register a new Stasis application that receives all Asterisk events.
+ *
+ * If an application is already registered with the given name, the old
+ * application is sent a 'replaced' message and unregistered.
+ *
+ * \param app_name Name of this application.
+ * \param handler Callback for application messages.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
+ *
+ * \return 0 for success
+ * \return -1 for error.
+ */
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
+
+/*!
  * \brief Unregister a Stasis application.
  * \param app_name Name of the application to unregister.
  */
diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c
index 09bcafc..71d54b4 100644
--- a/res/ari/resource_events.c
+++ b/res/ari/resource_events.c
@@ -148,9 +148,11 @@
  * \brief Register for all of the apps given.
  * \param session Session info struct.
  * \param app_name Name of application to register.
+ * \param register_handler Pointer to the application registration handler
  */
 static int session_register_app(struct event_session *session,
-				 const char *app_name)
+				 const char *app_name,
+				 int (* register_handler)(const char *, stasis_app_cb handler, void *data))
 {
 	SCOPED_AO2LOCK(lock, session);
 
@@ -167,7 +169,7 @@
 		return -1;
 	}
 
-	stasis_app_register(app_name, app_handler, session);
+	register_handler(app_name, app_handler, session);
 
 	return 0;
 }
@@ -178,6 +180,7 @@
 {
 	int res = 0;
 	size_t i, j;
+	int (* register_handler)(const char *, stasis_app_cb handler, void *data);
 
 	ast_debug(3, "/events WebSocket attempted\n");
 
@@ -186,13 +189,19 @@
 		return -1;
 	}
 
+	if (args->subscribe_all) {
+		register_handler = &stasis_app_register_all;
+	} else {
+		register_handler = &stasis_app_register;
+	}
+
 	for (i = 0; i < args->app_count; ++i) {
 		if (ast_strlen_zero(args->app[i])) {
 			res = -1;
 			break;
 		}
 
-		res |= stasis_app_register(args->app[i], app_handler, NULL);
+		res |= register_handler(args->app[i], app_handler, NULL);
 	}
 
 	if (res) {
@@ -213,6 +222,7 @@
 	struct ast_json *msg;
 	int res;
 	size_t i;
+	int (* register_handler)(const char *, stasis_app_cb handler, void *data);
 
 	ast_debug(3, "/events WebSocket connection\n");
 
@@ -222,12 +232,18 @@
 		return;
 	}
 
+	if (args->subscribe_all) {
+		register_handler = &stasis_app_register_all;
+	} else {
+		register_handler = &stasis_app_register;
+	}
+
 	res = 0;
 	for (i = 0; i < args->app_count; ++i) {
 		if (ast_strlen_zero(args->app[i])) {
 			continue;
 		}
-		res |= session_register_app(session, args->app[i]);
+		res |= session_register_app(session, args->app[i], register_handler);
 	}
 
 	if (ao2_container_count(session->websocket_apps) == 0) {
diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h
index 2b63181..c482699 100644
--- a/res/ari/resource_events.h
+++ b/res/ari/resource_events.h
@@ -47,6 +47,8 @@
 	size_t app_count;
 	/*! Parsing context for app. */
 	char *app_parse;
+	/*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
+	int subscribe_all;
 };
 
 /*!
diff --git a/res/res_ari_events.c b/res/res_ari_events.c
index 4265385..65bd38d 100644
--- a/res/res_ari_events.c
+++ b/res/res_ari_events.c
@@ -110,6 +110,9 @@
 				args.app[j] = (vals[j]);
 			}
 		} else
+		if (strcmp(i->name, "subscribeAll") == 0) {
+			args.subscribe_all = ast_true(i->value);
+		} else
 		{}
 	}
 
@@ -208,6 +211,9 @@
 				args.app[j] = (vals[j]);
 			}
 		} else
+		if (strcmp(i->name, "subscribeAll") == 0) {
+			args.subscribe_all = ast_true(i->value);
+		} else
 		{}
 	}
 
diff --git a/res/res_stasis.c b/res/res_stasis.c
index fc34fa3..25866d9 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -109,6 +109,11 @@
 
 struct ao2_container *app_bridges_playback;
 
+/*!
+ * \internal \brief List of registered event sources.
+ */
+AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
+
 static struct ast_json *stasis_end_to_json(struct stasis_message *message,
 		const struct stasis_message_sanitizer *sanitize)
 {
@@ -1469,7 +1474,7 @@
 	return ao2_bump(apps);
 }
 
-int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
 {
 	RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
 
@@ -1482,8 +1487,20 @@
 	if (app) {
 		app_update(app, handler, data);
 	} else {
-		app = app_create(app_name, handler, data);
+		app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
 		if (app) {
+			if (all_events) {
+				struct stasis_app_event_source *source;
+				SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+				AST_LIST_TRAVERSE(&event_sources, source, next) {
+					if (!source->subscribe) {
+						continue;
+					}
+
+					source->subscribe(app, NULL);
+				}
+			}
 			ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
 		} else {
 			ao2_unlock(apps_registry);
@@ -1497,6 +1514,16 @@
 	cleanup();
 	ao2_unlock(apps_registry);
 	return 0;
+}
+
+int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
+{
+	return __stasis_app_register(app_name, handler, data, 0);
+}
+
+int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
+{
+	return __stasis_app_register(app_name, handler, data, 1);
 }
 
 void stasis_app_unregister(const char *app_name)
@@ -1525,11 +1552,6 @@
 	 */
 	cleanup();
 }
-
-/*!
- * \internal \brief List of registered event sources.
- */
-AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
 
 void stasis_app_register_event_source(struct stasis_app_event_source *obj)
 {
@@ -1727,8 +1749,8 @@
 
 	ast_debug(3, "%s: Checking %s\n", app_name, uri);
 
-	if (!event_source->find ||
-	    (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
+	if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
+	    (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
 		ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
 		return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
 	}
@@ -2062,6 +2084,7 @@
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
+	.load_pri = AST_MODPRI_APP_DEPEND,
 	.support_level = AST_MODULE_SUPPORT_CORE,
 	.load = load_module,
 	.unload = unload_module,
diff --git a/res/stasis/app.c b/res/stasis/app.c
index caa27ab..dfb46c5 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -38,6 +38,10 @@
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_message_router.h"
 
+#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
+#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
+#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
+
 static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
 
 struct stasis_app {
@@ -47,12 +51,16 @@
 	struct stasis_message_router *router;
 	/*! Router for handling messages to the bridge all \a topic. */
 	struct stasis_message_router *bridge_router;
+	/*! Optional router for handling endpoint messages in 'all' subscriptions */
+	struct stasis_message_router *endpoint_router;
 	/*! Container of the channel forwards to this app's topic. */
 	struct ao2_container *forwards;
 	/*! Callback function for this application. */
 	stasis_app_cb handler;
 	/*! Opaque data to hand to callback function. */
 	void *data;
+	/*! Subscription model for the application */
+	enum stasis_app_subscription_model subscription_model;
 	/*! Name of the Stasis application */
 	char name[];
 };
@@ -121,34 +129,33 @@
 static struct app_forwards *forwards_create_channel(struct stasis_app *app,
 	struct ast_channel *chan)
 {
-	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+	struct app_forwards *forwards;
 
-	if (!app || !chan) {
+	if (!app) {
 		return NULL;
 	}
 
-	forwards = forwards_create(app, ast_channel_uniqueid(chan));
+	forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
 	if (!forwards) {
 		return NULL;
 	}
 
 	forwards->forward_type = FORWARD_CHANNEL;
-	forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
-		app->topic);
-	if (!forwards->topic_forward) {
-		return NULL;
+	if (chan) {
+		forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+			app->topic);
 	}
-
 	forwards->topic_cached_forward = stasis_forward_all(
-		ast_channel_topic_cached(chan), app->topic);
-	if (!forwards->topic_cached_forward) {
+		chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+		app->topic);
+
+	if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_forward_cancel(forwards->topic_forward);
-		forwards->topic_forward = NULL;
+		forwards_unsubscribe(forwards);
+		ao2_ref(forwards, -1);
 		return NULL;
 	}
 
-	ao2_ref(forwards, +1);
 	return forwards;
 }
 
@@ -156,69 +163,101 @@
 static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
 	struct ast_bridge *bridge)
 {
-	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+	struct app_forwards *forwards;
 
-	if (!app || !bridge) {
+	if (!app) {
 		return NULL;
 	}
 
-	forwards = forwards_create(app, bridge->uniqueid);
+	forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
 	if (!forwards) {
 		return NULL;
 	}
 
 	forwards->forward_type = FORWARD_BRIDGE;
-	forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
-		app->topic);
-	if (!forwards->topic_forward) {
-		return NULL;
+	if (bridge) {
+		forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+			app->topic);
 	}
-
 	forwards->topic_cached_forward = stasis_forward_all(
-		ast_bridge_topic_cached(bridge), app->topic);
-	if (!forwards->topic_cached_forward) {
+		bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
+		app->topic);
+
+	if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_forward_cancel(forwards->topic_forward);
-		forwards->topic_forward = NULL;
+		forwards_unsubscribe(forwards);
+		ao2_ref(forwards, -1);
 		return NULL;
 	}
 
-	ao2_ref(forwards, +1);
 	return forwards;
+}
+
+static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
+	struct stasis_message *message)
+{
+	struct stasis_app *app = data;
+
+	stasis_publish(app->topic, message);
 }
 
 /*! Forward a endpoint's topics to an app */
 static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
 	struct ast_endpoint *endpoint)
 {
-	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
 
-	if (!app || !endpoint) {
+	struct app_forwards *forwards;
+	int ret = 0;
+
+	if (!app) {
 		return NULL;
 	}
 
-	forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
+	forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
 	if (!forwards) {
 		return NULL;
 	}
 
 	forwards->forward_type = FORWARD_ENDPOINT;
-	forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
-		app->topic);
-	if (!forwards->topic_forward) {
-		return NULL;
+	if (endpoint) {
+		forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
+			app->topic);
+		forwards->topic_cached_forward = stasis_forward_all(
+			ast_endpoint_topic_cached(endpoint), app->topic);
+
+		if (!forwards->topic_forward || !forwards->topic_cached_forward) {
+			/* Half-subscribed is a bad thing */
+			forwards_unsubscribe(forwards);
+			ao2_ref(forwards, -1);
+			return NULL;
+		}
+	} else {
+		/* Since endpoint subscriptions also subscribe to channels, in the case
+		 * of all endpoint subscriptions, we only want messages for the endpoints.
+		 * As such, we route those particular messages and then re-publish them
+		 * on the app's topic.
+		 */
+		ast_assert(app->endpoint_router == NULL);
+		app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
+		if (!app->endpoint_router) {
+			forwards_unsubscribe(forwards);
+			ao2_ref(forwards, -1);
+			return NULL;
+		}
+
+		ret |= stasis_message_router_add(app->endpoint_router,
+			ast_endpoint_state_type(), endpoint_state_cb, app);
+		ret |= stasis_message_router_add(app->endpoint_router,
+			ast_endpoint_contact_state_type(), endpoint_state_cb, app);
+
+		if (ret) {
+			ao2_ref(app->endpoint_router, -1);
+			app->endpoint_router = NULL;
+			ao2_ref(forwards, -1);
+			return NULL;
+		}
 	}
 
-	forwards->topic_cached_forward = stasis_forward_all(
-		ast_endpoint_topic_cached(endpoint), app->topic);
-	if (!forwards->topic_cached_forward) {
-		/* Half-subscribed is a bad thing */
-		stasis_forward_cancel(forwards->topic_forward);
-		forwards->topic_forward = NULL;
-		return NULL;
-	}
-
-	ao2_ref(forwards, +1);
 	return forwards;
 }
 
@@ -260,6 +299,7 @@
 
 	ast_assert(app->router == NULL);
 	ast_assert(app->bridge_router == NULL);
+	ast_assert(app->endpoint_router == NULL);
 
 	ao2_cleanup(app->topic);
 	app->topic = NULL;
@@ -793,7 +833,7 @@
 	}
 }
 
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
 {
 	RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
 	size_t size;
@@ -806,10 +846,10 @@
 
 	size = sizeof(*app) + strlen(name) + 1;
 	app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
-
 	if (!app) {
 		return NULL;
 	}
+	app->subscription_model = subscription_model;
 
 	app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
 		AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@@ -877,7 +917,8 @@
 	return app;
 }
 
-struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
+struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
+{
 	return app->topic;
 }
 
@@ -930,6 +971,8 @@
 	app->router = NULL;
 	stasis_message_router_unsubscribe(app->bridge_router);
 	app->bridge_router = NULL;
+	stasis_message_router_unsubscribe(app->endpoint_router);
+	app->endpoint_router = NULL;
 }
 
 int app_is_active(struct stasis_app *app)
@@ -1029,34 +1072,47 @@
 
 int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
 {
+	struct app_forwards *forwards;
+	SCOPED_AO2LOCK(lock, app->forwards);
 	int res;
 
-	if (!app || !chan) {
+	if (!app) {
 		return -1;
-	} else {
-		RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-		SCOPED_AO2LOCK(lock, app->forwards);
+	}
 
-		forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
-			OBJ_SEARCH_KEY | OBJ_NOLOCK);
-		if (!forwards) {
-			/* Forwards not found, create one */
-			forwards = forwards_create_channel(app, chan);
-			if (!forwards) {
-				return -1;
-			}
-
-			res = ao2_link_flags(app->forwards, forwards,
-				OBJ_NOLOCK);
-			if (!res) {
-				return -1;
-			}
-		}
-
-		++forwards->interested;
-		ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
+	/* If subscribed to all, don't subscribe again */
+	forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (forwards) {
+		ao2_ref(forwards, -1);
 		return 0;
 	}
+
+	forwards = ao2_find(app->forwards,
+		chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
+		OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (!forwards) {
+		/* Forwards not found, create one */
+		forwards = forwards_create_channel(app, chan);
+		if (!forwards) {
+			return -1;
+		}
+
+		res = ao2_link_flags(app->forwards, forwards,
+			OBJ_NOLOCK);
+		if (!res) {
+			ao2_ref(forwards, -1);
+			return -1;
+		}
+	}
+
+	++forwards->interested;
+	ast_debug(3, "Channel '%s' is %d interested in %s\n",
+		chan ? ast_channel_uniqueid(chan) : "ALL",
+		forwards->interested,
+		app->name);
+
+	ao2_ref(forwards, -1);
+	return 0;
 }
 
 static int subscribe_channel(struct stasis_app *app, void *obj)
@@ -1068,6 +1124,19 @@
 {
 	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
 	SCOPED_AO2LOCK(lock, app->forwards);
+
+	if (!id) {
+		if (!strcmp(kind, "bridge")) {
+			id = BRIDGE_ALL;
+		} else if (!strcmp(kind, "channel")) {
+			id = CHANNEL_ALL;
+		} else if (!strcmp(kind, "endpoint")) {
+			id = ENDPOINT_ALL;
+		} else {
+			ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
+			return -1;
+		}
+	}
 
 	forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 	if (!forwards) {
@@ -1095,25 +1164,29 @@
 
 int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
 {
-	if (!app || !chan) {
+	if (!app) {
 		return -1;
 	}
 
-	return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
+	return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
 }
 
 int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
 {
-	if (!app || !channel_id) {
+	if (!app) {
 		return -1;
 	}
 
-	return unsubscribe(app, "channel", channel_id, 0);
+	return unsubscribe(app, "channel", S_OR(channel_id, CHANNEL_ALL), 0);
 }
 
 int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
 {
 	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (ast_strlen_zero(channel_id)) {
+		channel_id = CHANNEL_ALL;
+	}
 	forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
 	return forwards != NULL;
 }
@@ -1133,28 +1206,39 @@
 
 int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
 {
-	if (!app || !bridge) {
+	struct app_forwards *forwards;
+	SCOPED_AO2LOCK(lock, app->forwards);
+
+	if (!app) {
 		return -1;
-	} else {
-		RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-		SCOPED_AO2LOCK(lock, app->forwards);
+	}
 
-		forwards = ao2_find(app->forwards, bridge->uniqueid,
-			OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
-		if (!forwards) {
-			/* Forwards not found, create one */
-			forwards = forwards_create_bridge(app, bridge);
-			if (!forwards) {
-				return -1;
-			}
-			ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-		}
-
-		++forwards->interested;
-		ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
+	/* If subscribed to all, don't subscribe again */
+	forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (forwards) {
+		ao2_ref(forwards, -1);
 		return 0;
 	}
+
+	forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
+		OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (!forwards) {
+		/* Forwards not found, create one */
+		forwards = forwards_create_bridge(app, bridge);
+		if (!forwards) {
+			return -1;
+		}
+		ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+	}
+
+	++forwards->interested;
+	ast_debug(3, "Bridge '%s' is %d interested in %s\n",
+		bridge ? bridge->uniqueid : "ALL",
+		forwards->interested,
+		app->name);
+
+	ao2_ref(forwards, -1);
+	return 0;
 }
 
 static int subscribe_bridge(struct stasis_app *app, void *obj)
@@ -1164,27 +1248,44 @@
 
 int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
 {
-	if (!app || !bridge) {
+	if (!app) {
 		return -1;
 	}
 
-	return app_unsubscribe_bridge_id(app, bridge->uniqueid);
+	return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
 }
 
 int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
 {
-	if (!app || !bridge_id) {
+	if (!app) {
 		return -1;
 	}
 
-	return unsubscribe(app, "bridge", bridge_id, 0);
+	return unsubscribe(app, "bridge", S_OR(bridge_id, BRIDGE_ALL), 0);
 }
 
 int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
 {
-	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-	forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
-	return forwards != NULL;
+	struct app_forwards *forwards;
+	SCOPED_AO2LOCK(lock, app->forwards);
+
+	forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (forwards) {
+		ao2_ref(forwards, -1);
+		return 1;
+	}
+
+	if (ast_strlen_zero(bridge_id)) {
+		bridge_id = BRIDGE_ALL;
+	}
+
+	forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (forwards) {
+		ao2_ref(forwards, -1);
+		return 1;
+	}
+
+	return 0;
 }
 
 static void *bridge_find(const struct stasis_app *app, const char *id)
@@ -1202,31 +1303,43 @@
 
 int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
 {
-	if (!app || !endpoint) {
+	struct app_forwards *forwards;
+	SCOPED_AO2LOCK(lock, app->forwards);
+
+	if (!app) {
 		return -1;
-	} else {
-		RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-		SCOPED_AO2LOCK(lock, app->forwards);
+	}
 
-		forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
-			OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
-		if (!forwards) {
-			/* Forwards not found, create one */
-			forwards = forwards_create_endpoint(app, endpoint);
-			if (!forwards) {
-				return -1;
-			}
-			ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
-
-			/* Subscribe for messages */
-			messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
-		}
-
-		++forwards->interested;
-		ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
+	/* If subscribed to all, don't subscribe again */
+	forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (forwards) {
+		ao2_ref(forwards, -1);
 		return 0;
 	}
+
+	forwards = ao2_find(app->forwards,
+		endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
+		OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (!forwards) {
+		/* Forwards not found, create one */
+		forwards = forwards_create_endpoint(app, endpoint);
+		if (!forwards) {
+			return -1;
+		}
+		ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+
+		/* Subscribe for messages */
+		messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
+	}
+
+	++forwards->interested;
+	ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
+		endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
+		forwards->interested,
+		app->name);
+
+	ao2_ref(forwards, -1);
+	return 0;
 }
 
 static int subscribe_endpoint(struct stasis_app *app, void *obj)
@@ -1236,16 +1349,20 @@
 
 int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
 {
-	if (!app || !endpoint_id) {
+	if (!app) {
 		return -1;
 	}
 
-	return unsubscribe(app, "endpoint", endpoint_id, 0);
+	return unsubscribe(app, "endpoint", S_OR(endpoint_id, ENDPOINT_ALL), 0);
 }
 
 int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
 {
 	RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+	if (ast_strlen_zero(endpoint_id)) {
+		endpoint_id = ENDPOINT_ALL;
+	}
 	forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
 	return forwards != NULL;
 }
diff --git a/res/stasis/app.h b/res/stasis/app.h
index 59574f5..2c8db1c 100644
--- a/res/stasis/app.h
+++ b/res/stasis/app.h
@@ -36,6 +36,19 @@
  */
 struct stasis_app;
 
+enum stasis_app_subscription_model {
+	/*
+	 * \brief An application must manually subscribe to each
+	 * resource that it cares about. This is the default approach.
+	 */
+	STASIS_APP_SUBSCRIBE_MANUAL,
+	/*
+	 * \brief An application is automatically subscribed to all
+	 * resources in Asterisk, even if it does not control them.
+	 */
+	STASIS_APP_SUBSCRIBE_ALL
+};
+
 /*!
  * \brief Create a res_stasis application.
  *
@@ -45,7 +58,7 @@
  * \return New \c res_stasis application.
  * \return \c NULL on error.
  */
-struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
+struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
 
 /*!
  * \brief Tears down an application.
diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c
index fd7cf9f..229a3a6 100644
--- a/res/stasis/messaging.c
+++ b/res/stasis/messaging.c
@@ -38,6 +38,11 @@
 #include "messaging.h"
 
 /*!
+ * \brief Subscription to all technologies
+ */
+#define TECH_WILDCARD "__AST_ALL_TECH"
+
+/*!
  * \brief Number of buckets for the \ref endpoint_subscriptions container
  */
 #define ENDPOINTS_NUM_BUCKETS 127
@@ -219,10 +224,14 @@
 	for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
 		sub = AST_VECTOR_GET(&tech_subscriptions, i);
 
-		if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
-		            || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+		if (!sub) {
+			continue;
+		}
+
+		if (!strcmp(sub->token, TECH_WILDCARD)
+		    || !strncasecmp(sub->token, buf, strlen(sub->token))
+		    || !strncasecmp(sub->token, buf, strlen(sub->token))) {
 			ast_rwlock_unlock(&tech_subscriptions_lock);
-			sub = NULL; /* No ref bump! */
 			goto match;
 		}
 
@@ -231,6 +240,7 @@
 
 	sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
 	if (sub) {
+		ao2_ref(sub, -1);
 		goto match;
 	}
 
@@ -238,7 +248,6 @@
 	return 0;
 
 match:
-	ao2_cleanup(sub);
 	return 1;
 }
 
@@ -301,7 +310,8 @@
 			continue;
 		}
 
-		if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+		if (!strcmp(sub->token, TECH_WILDCARD)
+		    || !strncasecmp(sub->token, buf, strlen(sub->token))) {
 			ast_rwlock_unlock(&tech_subscriptions_lock);
 			ao2_bump(sub);
 			endpoint_name = buf;
@@ -374,7 +384,7 @@
 {
 	struct message_subscription *sub = NULL;
 
-	if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+	if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
 		sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
 	} else {
 		int i;
@@ -383,7 +393,7 @@
 		for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
 			sub = AST_VECTOR_GET(&tech_subscriptions, i);
 
-			if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+			if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
 				ao2_bump(sub);
 				break;
 			}
@@ -400,10 +410,6 @@
 	RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
 
 	endpoint = ast_endpoint_find_by_id(endpoint_id);
-	if (!endpoint) {
-		return;
-	}
-
 	sub = get_subscription(endpoint);
 	if (!sub) {
 		return;
@@ -417,11 +423,11 @@
 
 	AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
 	if (AST_VECTOR_SIZE(&sub->applications) == 0) {
-		if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+		if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
 			ao2_unlink(endpoint_subscriptions, sub);
 		} else {
 			ast_rwlock_wrlock(&tech_subscriptions_lock);
-			AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+			AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
 				messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
 			ast_rwlock_unlock(&tech_subscriptions_lock);
 		}
@@ -429,9 +435,9 @@
 	ao2_unlock(sub);
 	ao2_ref(sub, -1);
 
-	ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+	ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
 	ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
-		app_name, ast_endpoint_get_id(endpoint));
+		app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
 }
 
 static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@@ -442,12 +448,12 @@
 		return sub;
 	}
 
-	sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+	sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
 	if (!sub) {
 		return NULL;
 	}
 
-	if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+	if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
 		ao2_link(endpoint_subscriptions, sub);
 	} else {
 		ast_rwlock_wrlock(&tech_subscriptions_lock);
@@ -482,9 +488,9 @@
 	AST_VECTOR_APPEND(&sub->applications, tuple);
 	ao2_unlock(sub);
 
-	ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+	ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
 	ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
-		app_name, ast_endpoint_get_id(endpoint));
+		app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
 
 	return 0;
 }
diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json
index 8d74900..6276fc2 100644
--- a/rest-api/api-docs/events.json
+++ b/rest-api/api-docs/events.json
@@ -26,6 +26,14 @@
 							"required": true,
 							"allowMultiple": true,
 							"dataType": "string"
+						},
+						{
+							"name": "subscribeAll",
+							"description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
+							"paramType": "query",
+							"required": false,
+							"allowMultiple": false,
+							"dataType": "boolean"
 						}
 					]
 				}

-- 
To view, visit https://gerrit.asterisk.org/1193
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4a943b4db24442cf28bc64b24bfd541249790ad6
Gerrit-PatchSet: 2
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Matt Jordan <mjordan at digium.com>



More information about the asterisk-code-review mailing list