[Asterisk-code-review] bridges: Remove reliance on stasis caching (asterisk[master])

George Joseph asteriskteam at digium.com
Thu Sep 20 14:44:46 CDT 2018


George Joseph has uploaded this change for review. ( https://gerrit.asterisk.org/10227


Change subject: bridges:  Remove reliance on stasis caching
......................................................................

bridges:  Remove reliance on stasis caching

* The bridging core no longer uses the stasis cache for bridge
  snapshots.  Bridge snapshots are now stored in an rbtree
  container in stasis_bridges.c

* The following APIs are no longer available since the stasis cache
  is no longer used:
    ast_bridge_topic_cached()
    ast_bridge_topic_all_cached()

* A topic pool is now used for individual bridge topics.

* The ast_bridge_cache() function now returns an ao2_container of
  ast_bridge_snapshots rather than a container of stasis_messages
  therefore you can't (and don't need to) call stasis_cache
  functions on it.

* The ast_bridge_topic_all() function now returns a normal topic
  not a cached one so you can't use stasis cache functions on it
  either.

* The ast_bridge_snapshot_type() stasis message now has the
  ast_bridge_snapshot_update structure as it's data.  It contains
  the last snapshot and the new one.

* ast_bridge_snapshot_get_latest() still returns the latest
  snapshot.

* cdr, cel, manager and ari have been updated to use the new
  arrangement.

Change-Id: I7049b80efa88676ce5c4666f818fa18ad1985369
---
M CHANGES
M UPGRADE.txt
M apps/confbridge/confbridge_manager.c
M include/asterisk/bridge.h
M include/asterisk/stasis_bridges.h
M main/Makefile
M main/bridge.c
M main/cdr.c
M main/cel.c
M main/manager_bridges.c
M main/stasis_bridges.c
M res/ari/resource_bridges.c
M res/stasis/app.c
13 files changed, 321 insertions(+), 212 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/27/10227/1

diff --git a/CHANGES b/CHANGES
index 26748f7..c2a99d0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,25 @@
 ==============================================================================
 
 ------------------------------------------------------------------------------
+--- Functionality changes from Asterisk 16 to Asterisk 17 --------------------
+------------------------------------------------------------------------------
+
+Bridging
+------------------
+ * The bridging core no longer uses the stasis cache for bridge snapshots.
+   The following APIs are no longer available:
+       ast_bridge_topic_cached()
+       ast_bridge_topic_all_cached()
+   The ast_bridge_cache() function now returns an ao2_container of
+   ast_bridge_snapshots rather than a container of stasis_messages
+   therefore you can't call stasis_cache functions on it.
+   The ast_bridge_topic_all() function now returns a normal topic,
+   not a cached one so you can't use stasis cache functions on it  either.
+   The ast_bridge_snapshot_type() stasis message now has the
+   ast_bridge_snapshot_update structure as it's data.
+   ast_bridge_snapshot_get_latest() still returns the latest snapshot.
+
+------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 15 to Asterisk 16 --------------------
 ------------------------------------------------------------------------------
 
diff --git a/UPGRADE.txt b/UPGRADE.txt
index b7bbf3b..13de3eb 100644
--- a/UPGRADE.txt
+++ b/UPGRADE.txt
@@ -25,3 +25,19 @@
 === UPGRADE-16.txt  -- Upgrade info for 15 to 16
 ===========================================================
 
+New in 17.0.0:
+
+Bridging
+------------------
+ * The bridging core no longer uses the stasis cache for bridge snapshots.
+   The following APIs are no longer available:
+       ast_bridge_topic_cached()
+       ast_bridge_topic_all_cached()
+   The ast_bridge_cache() function now returns an ao2_container of
+   ast_bridge_snapshots rather than a container of stasis_messages
+   therefore you can't call stasis_cache functions on it.
+   The ast_bridge_topic_all() function now returns a normal topic,
+   not a cached one so you can't use stasis cache functions on it  either.
+   The ast_bridge_snapshot_type() stasis message now has the
+   ast_bridge_snapshot_update structure as it's data.
+   ast_bridge_snapshot_get_latest() still returns the latest snapshot.
diff --git a/apps/confbridge/confbridge_manager.c b/apps/confbridge/confbridge_manager.c
index 51112ba..bd8c5a1 100644
--- a/apps/confbridge/confbridge_manager.c
+++ b/apps/confbridge/confbridge_manager.c
@@ -769,7 +769,7 @@
 	STASIS_MESSAGE_TYPE_INIT(confbridge_welcome_type);
 
 	bridge_state_router = stasis_message_router_create(
-		ast_bridge_topic_all_cached());
+		ast_bridge_topic_all());
 
 	if (!bridge_state_router) {
 		return -1;
diff --git a/include/asterisk/bridge.h b/include/asterisk/bridge.h
index 3584085..c39b4f7 100644
--- a/include/asterisk/bridge.h
+++ b/include/asterisk/bridge.h
@@ -307,7 +307,7 @@
 	/*! Private information unique to the bridge technology */
 	void *tech_pvt;
 	/*! Per-bridge topics */
-	struct stasis_cp_single *topics;
+	struct stasis_topic *topic;
 	/*! Call ID associated with the bridge */
 	ast_callid callid;
 	/*! Linked list of channels participating in the bridge */
diff --git a/include/asterisk/stasis_bridges.h b/include/asterisk/stasis_bridges.h
index a455a5b..d56c083 100644
--- a/include/asterisk/stasis_bridges.h
+++ b/include/asterisk/stasis_bridges.h
@@ -64,6 +64,21 @@
 	enum ast_bridge_video_mode_type video_mode;
 };
 
+struct ast_bridge_snapshot_update {
+	struct ast_bridge_snapshot *old_snapshot;
+	struct ast_bridge_snapshot *new_snapshot;
+};
+
+/*!
+ * \since 13.24
+ * \since 15.6
+ * \since 16.1
+ * \brief Delete a bridge snapshot from the cache
+ *
+ * \param uniqueid The bridge's uniqueid
+ */
+void ast_bridge_cache_delete_snapshot(const char *uniqueid);
+
 /*!
  * \since 12
  * \brief Generate a snapshot of the bridge state. This is an ao2 object, so
@@ -101,22 +116,6 @@
 
 /*!
  * \since 12
- * \brief A topic which publishes the events for a particular bridge.
- *
- * \ref ast_bridge_snapshot messages are replaced with stasis_cache_update
- * messages.
- *
- * If the given \a bridge is \c NULL, ast_bridge_topic_all_cached() is returned.
- *
- * \param bridge Bridge for which to get a topic or \c NULL.
- *
- * \retval Topic for bridge's events.
- * \retval ast_bridge_topic_all() if \a bridge is \c NULL.
- */
-struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge);
-
-/*!
- * \since 12
  * \brief A topic which publishes the events for all bridges.
  * \retval Topic for all bridge events.
  */
@@ -124,19 +123,10 @@
 
 /*!
  * \since 12
- * \brief A caching topic which caches \ref ast_bridge_snapshot messages from
- * ast_bridge_events_all(void).
- *
- * \retval Caching topic for all bridge events.
- */
-struct stasis_topic *ast_bridge_topic_all_cached(void);
-
-/*!
- * \since 12
  * \brief Backend cache for ast_bridge_topic_all_cached().
  * \retval Cache of \ref ast_bridge_snapshot.
  */
-struct stasis_cache *ast_bridge_cache(void);
+struct ao2_container *ast_bridge_cache(void);
 
 /*!
  * \since 12
@@ -511,6 +501,17 @@
 
 /*!
  * \internal
+ * \since 13.24
+ * \since 15.6
+ * \since 16.1
+ * \brief Clean up bridge cache and publish destroy
+ *
+ * \param bridge The bridge to clean up
+ */
+void bridge_topics_destroy(struct ast_bridge *bridge);
+
+/*!
+ * \internal
  * \brief Initialize the stasis bridging topic and message types
  * \retval 0 on success
  * \retval -1 on failure
diff --git a/main/Makefile b/main/Makefile
index 1cb2c25..a22a1ad 100644
--- a/main/Makefile
+++ b/main/Makefile
@@ -167,6 +167,7 @@
 sched.o: _ASTCFLAGS+=$(call get_menuselect_cflags,DEBUG_SCHEDULER DUMP_SCHEDULER)
 tcptls.o: _ASTCFLAGS+=$(OPENSSL_INCLUDE) -Wno-deprecated-declarations
 uuid.o: _ASTCFLAGS+=$(UUID_INCLUDE)
+stasis.o: _ASTCFLAGS+=$(call get_menuselect_cflags,AO2_DEBUG)
 
 
 OBJS:=$(sort $(OBJS))
diff --git a/main/bridge.c b/main/bridge.c
index 2b347fd..9af26ee 100644
--- a/main/bridge.c
+++ b/main/bridge.c
@@ -650,25 +650,6 @@
 	}
 }
 
-static struct stasis_message *create_bridge_snapshot_message(struct ast_bridge *bridge)
-{
-	RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
-
-	if (!ast_bridge_snapshot_type()) {
-		return NULL;
-	}
-
-	ast_bridge_lock(bridge);
-	snapshot = ast_bridge_snapshot_create(bridge);
-	ast_bridge_unlock(bridge);
-
-	if (!snapshot) {
-		return NULL;
-	}
-
-	return stasis_message_create(ast_bridge_snapshot_type(), snapshot);
-}
-
 static void destroy_bridge(void *obj)
 {
 	struct ast_bridge *bridge = obj;
@@ -677,17 +658,7 @@
 		bridge->uniqueid, bridge->v_table->name);
 
 	if (bridge->construction_completed) {
-		RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
-
-		clear_msg = create_bridge_snapshot_message(bridge);
-		if (clear_msg) {
-			RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
-			msg = stasis_cache_clear_create(clear_msg);
-			if (msg) {
-				stasis_publish(ast_bridge_topic(bridge), msg);
-			}
-		}
+		bridge_topics_destroy(bridge);
 	}
 
 	/* Do any pending actions in the context of destruction. */
@@ -726,8 +697,6 @@
 
 	cleanup_video_mode(bridge);
 
-	stasis_cp_single_unsubscribe(bridge->topics);
-
 	ast_string_field_free_memory(bridge);
 }
 
@@ -5069,20 +5038,18 @@
 	int wordlen = strlen(word);
 	struct ao2_container *cached_bridges;
 	struct ao2_iterator iter;
-	struct stasis_message *msg;
+	struct ast_bridge_snapshot *snapshot;
 
-	cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
+	cached_bridges = ast_bridge_cache();
 	if (!cached_bridges) {
 		return NULL;
 	}
 
 	iter = ao2_iterator_init(cached_bridges, 0);
-	for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) {
-		struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
-
+	for (; (snapshot = ao2_iterator_next(&iter)); ao2_ref(snapshot, -1)) {
 		if (!strncasecmp(word, snapshot->uniqueid, wordlen)) {
 			if (ast_cli_completion_add(ast_strdup(snapshot->uniqueid))) {
-				ao2_ref(msg, -1);
+				ao2_ref(snapshot, -1);
 				break;
 			}
 		}
@@ -5098,9 +5065,9 @@
 #define FORMAT_HDR "%-36s %5s %-15s %s\n"
 #define FORMAT_ROW "%-36s %5u %-15s %s\n"
 
-	RAII_VAR(struct ao2_container *, cached_bridges, NULL, ao2_cleanup);
+	struct ao2_container *cached_bridges;
 	struct ao2_iterator iter;
-	struct stasis_message *msg;
+	struct ast_bridge_snapshot *snapshot;
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -5113,7 +5080,7 @@
 		return NULL;
 	}
 
-	cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
+	cached_bridges = ast_bridge_cache();
 	if (!cached_bridges) {
 		ast_cli(a->fd, "Failed to retrieve cached bridges\n");
 		return CLI_SUCCESS;
@@ -5122,9 +5089,7 @@
 	ast_cli(a->fd, FORMAT_HDR, "Bridge-ID", "Chans", "Type", "Technology");
 
 	iter = ao2_iterator_init(cached_bridges, 0);
-	for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) {
-		struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
-
+	for (; (snapshot = ao2_iterator_next(&iter)); ao2_ref(snapshot, -1)) {
 		ast_cli(a->fd, FORMAT_ROW,
 			snapshot->uniqueid,
 			snapshot->num_channels,
@@ -5132,6 +5097,8 @@
 			S_OR(snapshot->technology, "<unknown>"));
 	}
 	ao2_iterator_destroy(&iter);
+	ao2_ref(cached_bridges, -1);
+
 	return CLI_SUCCESS;
 
 #undef FORMAT_HDR
@@ -5159,7 +5126,6 @@
 
 static char *handle_bridge_show_specific(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	struct ast_bridge_snapshot *snapshot;
 
 	switch (cmd) {
@@ -5180,18 +5146,17 @@
 		return CLI_SHOWUSAGE;
 	}
 
-	msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
-	if (!msg) {
+	snapshot = ast_bridge_snapshot_get_latest(a->argv[2]);
+	if (!snapshot) {
 		ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
 		return CLI_SUCCESS;
 	}
-
-	snapshot = stasis_message_data(msg);
 	ast_cli(a->fd, "Id: %s\n", snapshot->uniqueid);
 	ast_cli(a->fd, "Type: %s\n", S_OR(snapshot->subclass, "<unknown>"));
 	ast_cli(a->fd, "Technology: %s\n", S_OR(snapshot->technology, "<unknown>"));
 	ast_cli(a->fd, "Num-Channels: %u\n", snapshot->num_channels);
 	ao2_callback(snapshot->channels, OBJ_NODATA, bridge_show_specific_print_channel, a);
+	ao2_ref(snapshot, -1);
 
 	return CLI_SUCCESS;
 }
diff --git a/main/cdr.c b/main/cdr.c
index 1c47e24..0ffb9cd 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -4306,7 +4306,7 @@
 	if (!channel_subscription) {
 		return -1;
 	}
-	bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
+	bridge_subscription = stasis_forward_all(ast_bridge_topic_all(), cdr_topic);
 	if (!bridge_subscription) {
 		return -1;
 	}
diff --git a/main/cel.c b/main/cel.c
index 0ec728e..738589f 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -1460,7 +1460,7 @@
 	}
 
 	cel_bridge_forwarder = stasis_forward_all(
-		ast_bridge_topic_all_cached(),
+		ast_bridge_topic_all(),
 		cel_aggregation_topic);
 	if (!cel_bridge_forwarder) {
 		return -1;
diff --git a/main/manager_bridges.c b/main/manager_bridges.c
index b7059f4..40007d4 100644
--- a/main/manager_bridges.c
+++ b/main/manager_bridges.c
@@ -330,22 +330,15 @@
 				    struct stasis_message *message)
 {
 	RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
-	struct stasis_cache_update *update;
-	struct ast_bridge_snapshot *old_snapshot;
-	struct ast_bridge_snapshot *new_snapshot;
+	struct ast_bridge_snapshot_update *update;
 	size_t i;
 
 	update = stasis_message_data(message);
 
-	ast_assert(ast_bridge_snapshot_type() == update->type);
-
-	old_snapshot = stasis_message_data(update->old_snapshot);
-	new_snapshot = stasis_message_data(update->new_snapshot);
-
 	for (i = 0; i < ARRAY_LEN(bridge_monitors); ++i) {
 		RAII_VAR(struct ast_manager_event_blob *, event, NULL, ao2_cleanup);
 
-		event = bridge_monitors[i](old_snapshot, new_snapshot);
+		event = bridge_monitors[i](update->old_snapshot, update->new_snapshot);
 		if (!event) {
 			continue;
 		}
@@ -354,7 +347,7 @@
 		if (!bridge_event_string) {
 			bridge_event_string =
 				ast_manager_build_bridge_state_string(
-					new_snapshot ? new_snapshot : old_snapshot);
+					update->new_snapshot ? update->new_snapshot : update->old_snapshot);
 			if (!bridge_event_string) {
 				return;
 			}
@@ -461,7 +454,7 @@
 
 static int send_bridge_list_item_cb(void *obj, void *arg, void *data, int flags)
 {
-	struct ast_bridge_snapshot *snapshot = stasis_message_data(obj);
+	struct ast_bridge_snapshot *snapshot = obj;
 	struct mansession *s = arg;
 	struct bridge_list_data *list_data = data;
 	RAII_VAR(struct ast_str *, bridge_info, ast_manager_build_bridge_state_string(snapshot), ast_free);
@@ -498,7 +491,7 @@
 		ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
 	}
 
-	bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
+	bridges = ast_bridge_cache();
 	if (!bridges) {
 		astman_send_error(s, m, "Internal error");
 		return -1;
@@ -564,7 +557,6 @@
 	const char *id = astman_get_header(m, "ActionID");
 	const char *bridge_uniqueid = astman_get_header(m, "BridgeUniqueid");
 	RAII_VAR(struct ast_str *, id_text, ast_str_create(128), ast_free);
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_str *, bridge_info, NULL, ast_free);
 	struct ast_bridge_snapshot *snapshot;
 	struct bridge_list_data list_data;
@@ -583,13 +575,7 @@
 		ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
 	}
 
-	msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid);
-	if (!msg) {
-		astman_send_error(s, m, "Specified BridgeUniqueid not found");
-		return 0;
-	}
-
-	snapshot = stasis_message_data(msg);
+	snapshot = ast_bridge_snapshot_get_latest(bridge_uniqueid);
 	bridge_info = ast_manager_build_bridge_state_string(snapshot);
 	if (!bridge_info) {
 		astman_send_error(s, m, "Internal error");
@@ -706,7 +692,7 @@
 		return -1;
 	}
 
-	bridge_topic = ast_bridge_topic_all_cached();
+	bridge_topic = ast_bridge_topic_all();
 	if (!bridge_topic) {
 		return -1;
 	}
@@ -721,7 +707,7 @@
 		return -1;
 	}
 
-	ret |= stasis_message_router_add_cache_update(bridge_state_router,
+	ret |= stasis_message_router_add(bridge_state_router,
 		ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
 
 	ret |= stasis_message_router_add(bridge_state_router,
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index 59b9685..d07fb1e 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -155,7 +155,8 @@
 	struct stasis_message *msg,
 	const struct stasis_message_sanitizer *sanitize);
 
-static struct stasis_cp_all *bridge_cache_all;
+static struct stasis_topic *bridge_topic_all;
+static struct stasis_topic_pool *bridge_topic_pool;
 
 /*!
  * @{ \brief Define bridge message types.
@@ -175,33 +176,16 @@
 	.to_ami = attended_transfer_to_ami);
 /*! @} */
 
-struct stasis_cache *ast_bridge_cache(void)
+static struct ao2_container *bridge_cache;
+
+struct ao2_container *ast_bridge_cache(void)
 {
-	return stasis_cp_all_cache(bridge_cache_all);
+	return ao2_bump(bridge_cache);
 }
 
 struct stasis_topic *ast_bridge_topic_all(void)
 {
-	return stasis_cp_all_topic(bridge_cache_all);
-}
-
-struct stasis_topic *ast_bridge_topic_all_cached(void)
-{
-	return stasis_cp_all_topic_cached(bridge_cache_all);
-}
-
-int bridge_topics_init(struct ast_bridge *bridge)
-{
-	if (ast_strlen_zero(bridge->uniqueid)) {
-		ast_log(LOG_ERROR, "Bridge id initialization required\n");
-		return -1;
-	}
-	bridge->topics = stasis_cp_single_create(bridge_cache_all,
-		bridge->uniqueid);
-	if (!bridge->topics) {
-		return -1;
-	}
-	return 0;
+	return bridge_topic_all;
 }
 
 struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
@@ -210,16 +194,7 @@
 		return ast_bridge_topic_all();
 	}
 
-	return stasis_cp_single_topic(bridge->topics);
-}
-
-struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge)
-{
-	if (!bridge) {
-		return ast_bridge_topic_all_cached();
-	}
-
-	return stasis_cp_single_topic_cached(bridge->topics);
+	return bridge->topic;
 }
 
 /*! \brief Destructor for bridge snapshots */
@@ -292,44 +267,158 @@
 	return snapshot;
 }
 
-void ast_bridge_publish_state(struct ast_bridge *bridge)
+void ast_bridge_cache_delete_snapshot(const char *uniqueid)
 {
-	struct ast_bridge_snapshot *snapshot;
-	struct stasis_message *msg;
+	ao2_find(bridge_cache, uniqueid, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
+}
 
-	if (!ast_bridge_snapshot_type()) {
-		return;
+static void bridge_snapshot_update_dtor(void *obj)
+{
+	struct ast_bridge_snapshot_update *update = obj;
+
+	ast_debug(3, "Update: %p  Old: %s  New: %s\n", update,
+		update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
+		update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
+	ao2_cleanup(update->old_snapshot);
+	ao2_cleanup(update->new_snapshot);
+}
+
+static struct ast_bridge_snapshot_update *bridge_snapshot_update_create(
+	struct ast_bridge_snapshot *old, struct ast_bridge_snapshot *new)
+{
+	struct ast_bridge_snapshot_update *update;
+
+	update = ao2_alloc_options(sizeof(*update), bridge_snapshot_update_dtor,
+			AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!update) {
+		return NULL;
 	}
+	update->old_snapshot = old;
+	update->new_snapshot = new;
+
+	ast_debug(3, "Update: %p  Old: %s  New: %s\n", update,
+		update->old_snapshot ? update->old_snapshot->uniqueid : "<none>",
+		update->new_snapshot ? update->new_snapshot->uniqueid : "<none>");
+
+	return update;
+}
+
+int bridge_topics_init(struct ast_bridge *bridge)
+{
+	if (ast_strlen_zero(bridge->uniqueid)) {
+		ast_log(LOG_ERROR, "Bridge id initialization required\n");
+		return -1;
+	}
+
+	bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
+	if (!bridge->topic) {
+		return -1;
+	}
+
+	return 0;
+}
+
+void bridge_topics_destroy(struct ast_bridge *bridge)
+{
+	struct ast_bridge_snapshot *old_snapshot;
+	struct ast_bridge_snapshot_update *update;
+	struct stasis_message *msg;
 
 	ast_assert(bridge != NULL);
 
-	snapshot = ast_bridge_snapshot_create(bridge);
-	if (!snapshot) {
+	old_snapshot = ast_bridge_snapshot_get_latest(bridge->uniqueid);
+	if (!old_snapshot) {
+		old_snapshot = ast_bridge_snapshot_create(bridge);
+		if (!old_snapshot) {
+			return;
+		}
+	} else {
+		ast_bridge_cache_delete_snapshot(bridge->uniqueid);
+	}
+
+	update = bridge_snapshot_update_create(old_snapshot, NULL);
+	if (!update) {
+		ao2_ref(old_snapshot, -1);
 		return;
 	}
 
-	msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
-	ao2_ref(snapshot, -1);
+	msg = stasis_message_create(ast_bridge_snapshot_type(), update);
+	ao2_ref(update, -1);
 	if (!msg) {
 		return;
 	}
 
 	stasis_publish(ast_bridge_topic(bridge), msg);
 	ao2_ref(msg, -1);
+
+	stasis_topic_pool_delete_topic(bridge_topic_pool, stasis_topic_name(ast_bridge_topic(bridge)));
+}
+
+void ast_bridge_publish_state(struct ast_bridge *bridge)
+{
+	struct ast_bridge_snapshot *new_snapshot;
+	struct ast_bridge_snapshot *old_snapshot;
+	struct ast_bridge_snapshot_update *update;
+	struct stasis_message *msg;
+
+	ast_assert(bridge != NULL);
+
+	new_snapshot = ast_bridge_snapshot_create(bridge);
+	if (!new_snapshot) {
+		return;
+	}
+
+	old_snapshot = ast_bridge_snapshot_get_latest(bridge->uniqueid);
+
+	/* We're transferring the snapshot references to the update */
+	update = bridge_snapshot_update_create(old_snapshot, new_snapshot);
+	if (!update) {
+		ao2_cleanup(old_snapshot);
+		ao2_ref(new_snapshot, -1);
+
+		return;
+	}
+
+	msg = stasis_message_create(ast_bridge_snapshot_type(), update);
+	ao2_ref(update, -1);
+	if (!msg) {
+		return;
+	}
+
+	/* The link of the new automatically replaces the old */
+	ao2_link(bridge_cache, new_snapshot);
+	stasis_publish(ast_bridge_topic(bridge), msg);
+	ao2_ref(msg, -1);
 }
 
 static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
 	struct ast_bridge_blob *obj)
 {
+	struct ast_bridge_snapshot *old_snapshot;
+	struct ast_bridge_snapshot_update *update;
 	struct stasis_message *msg;
 
 	ast_assert(obj != NULL);
 
-	msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
+	old_snapshot = ast_bridge_snapshot_get_latest(bridge->uniqueid);
+
+	/* We're transferring only the old snapshot reference to the update */
+	update = bridge_snapshot_update_create(old_snapshot, ao2_bump(obj->bridge));
+	if (!update) {
+		ao2_cleanup(old_snapshot);
+		ao2_ref(obj->bridge, -1);
+
+		return;
+	}
+
+	msg = stasis_message_create(ast_bridge_snapshot_type(), update);
+	ao2_ref(update, -1);
 	if (!msg) {
 		return;
 	}
 
+	/* The link of the new automatically replaces the old */
+	ao2_link(bridge_cache, obj->bridge);
 	stasis_publish(ast_bridge_topic(bridge), msg);
 	ao2_ref(msg, -1);
 }
@@ -1252,35 +1341,15 @@
 
 struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
 {
-	struct stasis_message *message;
 	struct ast_bridge_snapshot *snapshot;
 
 	ast_assert(!ast_strlen_zero(uniqueid));
 
-	message = stasis_cache_get(ast_bridge_cache(),
-			ast_bridge_snapshot_type(),
-			uniqueid);
-	if (!message) {
-		return NULL;
-	}
-
-	snapshot = ao2_bump(stasis_message_data(message));
-	ao2_ref(message, -1);
+	snapshot = ao2_find(bridge_cache, uniqueid, OBJ_SEARCH_KEY);
 
 	return snapshot;
 }
 
-/*! \brief snapshot ID getter for caching topic */
-static const char *bridge_snapshot_get_id(struct stasis_message *msg)
-{
-	struct ast_bridge_snapshot *snapshot;
-	if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
-		return NULL;
-	}
-	snapshot = stasis_message_data(msg);
-	return snapshot->uniqueid;
-}
-
 static void stasis_bridging_cleanup(void)
 {
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
@@ -1290,8 +1359,73 @@
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_blind_transfer_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_attended_transfer_type);
 
-	ao2_cleanup(bridge_cache_all);
-	bridge_cache_all = NULL;
+	ao2_cleanup(bridge_topic_pool);
+	bridge_topic_pool = NULL;
+	ao2_cleanup(bridge_topic_all);
+	bridge_topic_all = NULL;
+
+	ao2_container_unregister("bridge_snapshots");
+	ao2_cleanup(bridge_cache);
+}
+
+/*!
+ * \internal
+ * \brief Bridge ao2 container sort function.
+ * \since 12.0.0
+ *
+ * \param obj_left pointer to the (user-defined part) of an object.
+ * \param obj_right pointer to the (user-defined part) of an object.
+ * \param flags flags from ao2_callback()
+ *   OBJ_POINTER - if set, 'obj_right', is an object.
+ *   OBJ_KEY - if set, 'obj_right', is a search key item that is not an object.
+ *   OBJ_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
+ *
+ * \retval <0 if obj_left < obj_right
+ * \retval =0 if obj_left == obj_right
+ * \retval >0 if obj_left > obj_right
+ */
+static int bridge_snapshot_sort_cmp(const void *obj_left, const void *obj_right, int flags)
+{
+	const struct ast_bridge_snapshot *bridge_left = obj_left;
+	const struct ast_bridge_snapshot *bridge_right = obj_right;
+	const char *right_key = obj_right;
+	int cmp;
+
+	switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+	default:
+	case OBJ_POINTER:
+		right_key = bridge_right->uniqueid;
+		/* Fall through */
+	case OBJ_KEY:
+		cmp = strcmp(bridge_left->uniqueid, right_key);
+		break;
+	case OBJ_PARTIAL_KEY:
+		cmp = strncmp(bridge_left->uniqueid, right_key, strlen(right_key));
+		break;
+	}
+	return cmp;
+}
+
+/*!
+ * \internal
+ * \brief Print bridge object key (name).
+ * \since 12.0.0
+ *
+ * \param v_obj A pointer to the object we want the key printed.
+ * \param where User data needed by prnt to determine where to put output.
+ * \param prnt Print output callback function to use.
+ *
+ * \return Nothing
+ */
+static void bridge_snapshot_prnt_obj(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+	struct ast_bridge_snapshot *bridge = v_obj;
+
+	if (!bridge) {
+		return;
+	}
+	prnt(where, "%s %s chans:%u",
+		bridge->uniqueid, bridge->technology, bridge->num_channels);
 }
 
 int ast_stasis_bridging_init(void)
@@ -1300,10 +1434,19 @@
 
 	ast_register_cleanup(stasis_bridging_cleanup);
 
-	bridge_cache_all = stasis_cp_all_create("ast_bridge_topic_all",
-		bridge_snapshot_get_id);
+	bridge_cache = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+		AO2_CONTAINER_ALLOC_OPT_DUPS_REPLACE, bridge_snapshot_sort_cmp, NULL);
+	if (!bridge_cache) {
+		return -1;
+	}
+	ao2_container_register("bridge_cache", bridge_cache, bridge_snapshot_prnt_obj);
 
-	if (!bridge_cache_all) {
+	bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
+	if (!bridge_topic_all) {
+		return -1;
+	}
+	bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
+	if (!bridge_topic_pool) {
 		return -1;
 	}
 
diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c
index 9808288..738acdd 100644
--- a/res/ari/resource_bridges.c
+++ b/res/ari/resource_bridges.c
@@ -884,22 +884,12 @@
 	struct ast_ari_bridges_list_args *args,
 	struct ast_ari_response *response)
 {
-	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
 	RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
 	struct ao2_iterator i;
 	void *obj;
 
-	cache = ast_bridge_cache();
-	if (!cache) {
-		ast_ari_response_error(
-			response, 500, "Internal Server Error",
-			"Message bus not initialized");
-		return;
-	}
-	ao2_ref(cache, +1);
-
-	snapshots = stasis_cache_dump(cache, ast_bridge_snapshot_type());
+	snapshots = ast_bridge_cache();
 	if (!snapshots) {
 		ast_ari_response_alloc_failed(response);
 		return;
@@ -913,17 +903,19 @@
 
 	i = ao2_iterator_init(snapshots, 0);
 	while ((obj = ao2_iterator_next(&i))) {
-		RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
-		struct ast_bridge_snapshot *snapshot = stasis_message_data(msg);
+		struct ast_bridge_snapshot *snapshot = obj;
 		struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
 
+		ao2_ref(snapshot, -1);
 		if (!json_bridge || ast_json_array_append(json, json_bridge)) {
 			ao2_iterator_destroy(&i);
+			ao2_ref(snapshots, -1);
 			ast_ari_response_alloc_failed(response);
 			return;
 		}
 	}
 	ao2_iterator_destroy(&i);
+	ao2_ref(snapshots, -1);
 
 	ast_ari_response_ok(response, ast_json_ref(json));
 }
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 18ac7d6..7ca7210 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -182,12 +182,8 @@
 		forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
 			app->topic);
 	}
-	forwards->topic_cached_forward = stasis_forward_all(
-		bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
-		app->topic);
 
-	if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
-		/* Half-subscribed is a bad thing */
+	if (!forwards->topic_forward && bridge) {
 		forwards_unsubscribe(forwards);
 		ao2_ref(forwards, -1);
 		return NULL;
@@ -689,33 +685,23 @@
 {
 	struct ast_json *json = NULL;
 	struct stasis_app *app = data;
-	struct stasis_cache_update *update;
-	struct ast_bridge_snapshot *new_snapshot;
-	struct ast_bridge_snapshot *old_snapshot;
+	struct ast_bridge_snapshot_update *update;
 	const struct timeval *tv;
 
-	ast_assert(stasis_message_type(message) == stasis_cache_update_type());
-
 	update = stasis_message_data(message);
 
-	ast_assert(update->type == ast_bridge_snapshot_type());
+	tv = stasis_message_timestamp(message);
 
-	new_snapshot = stasis_message_data(update->new_snapshot);
-	old_snapshot = stasis_message_data(update->old_snapshot);
-	tv = update->new_snapshot ?
-		stasis_message_timestamp(update->new_snapshot) :
-		stasis_message_timestamp(message);
-
-	if (!new_snapshot) {
-		json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
-	} else if (!old_snapshot) {
-		json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
-	} else if (new_snapshot && old_snapshot
-		&& strcmp(new_snapshot->video_source_id, old_snapshot->video_source_id)) {
-		json = simple_bridge_event("BridgeVideoSourceChanged", new_snapshot, tv);
-		if (json && !ast_strlen_zero(old_snapshot->video_source_id)) {
+	if (!update->new_snapshot) {
+		json = simple_bridge_event("BridgeDestroyed", update->old_snapshot, tv);
+	} else if (!update->old_snapshot) {
+		json = simple_bridge_event("BridgeCreated", update->new_snapshot, tv);
+	} else if (update->new_snapshot && update->old_snapshot
+		&& strcmp(update->new_snapshot->video_source_id, update->old_snapshot->video_source_id)) {
+		json = simple_bridge_event("BridgeVideoSourceChanged", update->new_snapshot, tv);
+		if (json && !ast_strlen_zero(update->old_snapshot->video_source_id)) {
 			ast_json_object_set(json, "old_video_source_id",
-				ast_json_string_create(old_snapshot->video_source_id));
+				ast_json_string_create(update->old_snapshot->video_source_id));
 		}
 	}
 
@@ -724,8 +710,8 @@
 		ast_json_unref(json);
 	}
 
-	if (!new_snapshot && old_snapshot) {
-		unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
+	if (!update->new_snapshot && update->old_snapshot) {
+		unsubscribe(app, "bridge", update->old_snapshot->uniqueid, 1);
 	}
 }
 
@@ -984,7 +970,7 @@
 		return NULL;
 	}
 
-	res |= stasis_message_router_add_cache_update(app->router,
+	res |= stasis_message_router_add(app->router,
 		ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
 
 	res |= stasis_message_router_add_cache_update(app->router,

-- 
To view, visit https://gerrit.asterisk.org/10227
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7049b80efa88676ce5c4666f818fa18ad1985369
Gerrit-Change-Number: 10227
Gerrit-PatchSet: 1
Gerrit-Owner: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20180920/72794034/attachment-0001.html>


More information about the asterisk-code-review mailing list