[Asterisk-code-review] stasis: Use an implementation specific channel snapshot cache. (asterisk[master])

Jenkins2 asteriskteam at digium.com
Mon Nov 26 13:22:28 CST 2018


Jenkins2 has submitted this change and it was merged. ( https://gerrit.asterisk.org/10478 )

Change subject: stasis: Use an implementation specific channel snapshot cache.
......................................................................

stasis: Use an implementation specific channel snapshot cache.

Channels no longer use the Stasis cache for channel snapshots. Instead
they are stored in a hash table in stasis_channels which reduces the
number of Stasis messages created and allows better storage.

As a result the following APIs are no longer available since the stasis
cache is no longer used:
ast_channel_topic_cached()
ast_channel_topic_all_cached()

The ast_channel_cache_all() and ast_channel_cache_by_name() functions
now return an ao2_container of ast_channel_snapshots rather than
a container of stasis_messages therefore you can't (and don't need
to) call stasis_cache functions on it.

The ast_channel_topic_all() function now returns a normal topic not
a cached one so you can't use stasis cache functions on it either.

The ast_channel_snapshot_type() stasis message now has the
ast_channel_snapshot_update structure as it's data. It contains the
last snapshot and the new one.

ast_channel_snapshot_get_latest() still returns the latest snapshot.

The latest snapshot is now stored on the channel itself to eliminate
cache hits when Stasis messages that have the snapshot as a payload
are created.

ASTERISK-28102

Change-Id: I9334febff60a82d7c39703e49059fa3a68825786
---
M CHANGES
M UPGRADE.txt
M apps/app_agent_pool.c
M apps/confbridge/confbridge_manager.c
M channels/chan_pjsip.c
M channels/pjsip/cli_commands.c
M include/asterisk/channel.h
M include/asterisk/stasis_channels.h
M main/aoc.c
M main/app.c
M main/bridge.c
M main/cdr.c
M main/cel.c
M main/channel.c
M main/channel_internal_api.c
M main/cli.c
M main/endpoints.c
M main/manager.c
M main/manager_bridges.c
M main/manager_channels.c
M main/stasis_channels.c
M res/ari/resource_channels.c
M res/res_agi.c
M res/res_chan_stats.c
M res/stasis/app.c
M res/stasis/control.c
M tests/test_cel.c
M tests/test_stasis_endpoints.c
28 files changed, 456 insertions(+), 551 deletions(-)

Approvals:
  Joshua Colp: Looks good to me, approved
  Jenkins2: Approved for Submit



diff --git a/CHANGES b/CHANGES
index 2c98062..111a030 100644
--- a/CHANGES
+++ b/CHANGES
@@ -19,6 +19,22 @@
      https://wiki.asterisk.org/wiki/x/tAHOAQ
      https://wiki.asterisk.org/wiki/x/hYCLAQ
 
+Channels
+------------------
+ * The core no longer uses the stasis cache for channels snapshots.
+   The following APIs are no longer available:
+       ast_channel_topic_cached()
+       ast_channel_topic_all_cached()
+   The ast_channel_cache_all() and ast_channel_cache_by_name() functions
+   now returns an ao2_container of ast_channel_snapshots rather than a
+   container of stasis_messages therefore you can't call stasis_cache
+   functions on it.
+   The ast_channel_topic_all() function now returns a normal topic,
+   not a cached one so you can't use stasis cache functions on it either.
+   The ast_channel_snapshot_type() stasis message now has the
+   ast_channel_snapshot_update structure as it's data.
+   ast_channel_snapshot_get_latest() still returns the latest snapshot.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 16.0.0 to Asterisk 16.1.0 ------------
 ------------------------------------------------------------------------------
diff --git a/UPGRADE.txt b/UPGRADE.txt
index 3cf8e27..c299d83 100644
--- a/UPGRADE.txt
+++ b/UPGRADE.txt
@@ -43,3 +43,18 @@
 
 res_xmpp:
  - The JabberStatus application, deprecated in Asterisk 12, has been removed.
+
+Channels:
+ - The core no longer uses the stasis cache for channels snapshots.
+   The following APIs are no longer available:
+       ast_channel_topic_cached()
+       ast_channel_topic_all_cached()
+   The ast_channel_cache_all() and ast_channel_cache_by_name() functions
+   now returns an ao2_container of ast_channel_snapshots rather than a
+   container of stasis_messages therefore you can't call stasis_cache
+   functions on it.
+   The ast_channel_topic_all() function now returns a normal topic,
+   not a cached one so you can't use stasis cache functions on it either.
+   The ast_channel_snapshot_type() stasis message now has the
+   ast_channel_snapshot_update structure as it's data.
+   ast_channel_snapshot_get_latest() still returns the latest snapshot.
diff --git a/apps/app_agent_pool.c b/apps/app_agent_pool.c
index 805c403..5bd6a4d 100644
--- a/apps/app_agent_pool.c
+++ b/apps/app_agent_pool.c
@@ -1448,7 +1448,7 @@
 		return;
 	}
 
-	ast_channel_publish_cached_blob(chan, ast_channel_agent_login_type(), blob);
+	ast_channel_publish_blob(chan, ast_channel_agent_login_type(), blob);
 }
 
 static void send_agent_logoff(struct ast_channel *chan, const char *agent, long logintime)
@@ -1464,7 +1464,7 @@
 		return;
 	}
 
-	ast_channel_publish_cached_blob(chan, ast_channel_agent_logoff_type(), blob);
+	ast_channel_publish_blob(chan, ast_channel_agent_logoff_type(), blob);
 }
 
 /*!
diff --git a/apps/confbridge/confbridge_manager.c b/apps/confbridge/confbridge_manager.c
index e88bbc2..2d85033 100644
--- a/apps/confbridge/confbridge_manager.c
+++ b/apps/confbridge/confbridge_manager.c
@@ -783,7 +783,7 @@
 	}
 
 	channel_state_router = stasis_message_router_create(
-		ast_channel_topic_all_cached());
+		ast_channel_topic_all());
 
 	if (!channel_state_router) {
 		manager_confbridge_shutdown();
diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c
index 9edd989..d0a74cd 100644
--- a/channels/chan_pjsip.c
+++ b/channels/chan_pjsip.c
@@ -1135,7 +1135,6 @@
 	RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", data), ao2_cleanup);
 	enum ast_device_state state = AST_DEVICE_UNKNOWN;
 	RAII_VAR(struct ast_endpoint_snapshot *, endpoint_snapshot, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
 	struct ast_devstate_aggregate aggregate;
 	int num, inuse = 0;
 
@@ -1156,27 +1155,20 @@
 		state = AST_DEVICE_NOT_INUSE;
 	}
 
-	if (!endpoint_snapshot->num_channels || !(cache = ast_channel_cache())) {
+	if (!endpoint_snapshot->num_channels) {
 		return state;
 	}
 
 	ast_devstate_aggregate_init(&aggregate);
 
-	ao2_ref(cache, +1);
-
 	for (num = 0; num < endpoint_snapshot->num_channels; num++) {
-		RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 		struct ast_channel_snapshot *snapshot;
 
-		msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
-			endpoint_snapshot->channel_ids[num]);
-
-		if (!msg) {
+		snapshot = ast_channel_snapshot_get_latest(endpoint_snapshot->channel_ids[num]);
+		if (!snapshot) {
 			continue;
 		}
 
-		snapshot = stasis_message_data(msg);
-
 		if (chan_pjsip_get_hold(snapshot->uniqueid)) {
 			ast_devstate_aggregate_add(&aggregate, AST_DEVICE_ONHOLD);
 		} else {
@@ -1187,6 +1179,8 @@
 			(snapshot->state == AST_STATE_BUSY)) {
 			inuse++;
 		}
+
+		ao2_ref(snapshot, -1);
 	}
 
 	if (endpoint->devicestate_busy_at && (inuse == endpoint->devicestate_busy_at)) {
diff --git a/channels/pjsip/cli_commands.c b/channels/pjsip/cli_commands.c
index 33d0e02..9a8dc29 100644
--- a/channels/pjsip/cli_commands.c
+++ b/channels/pjsip/cli_commands.c
@@ -169,9 +169,8 @@
 
 static int cli_message_to_snapshot(void *obj, void *arg, int flags)
 {
-	struct stasis_message *message = obj;
+	struct ast_channel_snapshot *snapshot = obj;
 	struct ao2_container *snapshots = arg;
-	struct ast_channel_snapshot *snapshot = stasis_message_data(message);
 
 	if (!strcmp(snapshot->type, "PJSIP")) {
 		ao2_link(snapshots, snapshot);
@@ -198,8 +197,7 @@
 {
 	struct ao2_container *child_container;
 	regex_t regexbuf;
-	RAII_VAR(struct ao2_container *, parent_container,
-		stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()), ao2_cleanup);
+	RAII_VAR(struct ao2_container *, parent_container, ast_channel_cache_by_name(), ao2_cleanup);
 
 	if (!parent_container) {
 		return NULL;
diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h
index 3f22cdd..9627ae2 100644
--- a/include/asterisk/channel.h
+++ b/include/asterisk/channel.h
@@ -148,6 +148,15 @@
 #define AST_MAX_PUBLIC_UNIQUEID 149
 
 /*!
+ * The number of buckets to store channels or channel information
+ */
+#ifdef LOW_MEMORY
+#define AST_NUM_CHANNEL_BUCKETS 61
+#else
+#define AST_NUM_CHANNEL_BUCKETS 1567
+#endif
+
+/*!
  * Maximum size of an internal Asterisk channel unique ID.
  *
  * \details
@@ -2650,6 +2659,17 @@
 void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b);
 
 /*!
+ * \brief Swap snapshots beteween two channels
+ * \param a First channel
+ * \param b Second channel
+ * \return void
+ *
+ * \note
+ * This is used in masquerade to exchange snapshots
+ */
+void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b);
+
+/*!
  * \brief Set uniqueid and linkedid string value only (not time)
  * \param chan The channel to set the uniqueid to
  * \param uniqueid The uniqueid to set
@@ -4236,6 +4256,8 @@
 void ast_channel_adsicpe_set(struct ast_channel *chan, enum ast_channel_adsicpe value);
 enum ast_channel_state ast_channel_state(const struct ast_channel *chan);
 ast_callid ast_channel_callid(const struct ast_channel *chan);
+struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan);
+void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot);
 
 /*!
  * \pre chan is locked
@@ -4562,21 +4584,6 @@
 struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
 
 /*!
- * \since 12
- * \brief A topic which publishes the events for a particular channel.
- *
- * \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update
- *
- * If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned.
- *
- * \param chan Channel, or \c NULL.
- *
- * \retval Topic for channel's events.
- * \retval ast_channel_topic_all() if \a chan is \c NULL.
- */
-struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan);
-
-/*!
  * \brief Get the bridge associated with a channel
  * \since 12.0.0
  *
diff --git a/include/asterisk/stasis_channels.h b/include/asterisk/stasis_channels.h
index 4843617..2aeff6f 100644
--- a/include/asterisk/stasis_channels.h
+++ b/include/asterisk/stasis_channels.h
@@ -76,6 +76,23 @@
 };
 
 /*!
+ * \since 17
+ * \brief Structure representing a change of snapshot of channel state.
+ *
+ * While not enforced programmatically, this object is shared across multiple
+ * threads, and should be treated as an immutable object.
+ *
+ * \note This structure will not have a transition of an old snapshot with no
+ * new snapshot to indicate that a channel has gone away. A new snapshot will
+ * always exist and a channel going away can be determined by checking for the
+ * AST_FLAG_DEAD flag on the new snapshot.
+ */
+struct ast_channel_snapshot_update {
+	struct ast_channel_snapshot *old_snapshot; /*!< The old channel snapshot */
+	struct ast_channel_snapshot *new_snapshot; /*!< The new channel snapshot */
+};
+
+/*!
  * \since 12
  * \brief Blob of data associated with a channel.
  *
@@ -94,7 +111,7 @@
  */
 struct ast_multi_channel_blob;
 
-struct stasis_cp_all *ast_channel_cache_all(void);
+struct ao2_container *ast_channel_cache_all(void);
 
 /*!
  * \since 12
@@ -105,34 +122,17 @@
 
 /*!
  * \since 12
- * \brief A caching topic which caches \ref ast_channel_snapshot messages from
- * ast_channel_events_all(void).
- *
- * \retval Topic for all channel events.
- */
-struct stasis_topic *ast_channel_topic_all_cached(void);
-
-/*!
- * \since 12
- * \brief Primary channel cache, indexed by Uniqueid.
- *
- * \retval Cache of \ref ast_channel_snapshot.
- */
-struct stasis_cache *ast_channel_cache(void);
-
-/*!
- * \since 12
  * \brief Secondary channel cache, indexed by name.
  *
  * \retval Cache of \ref ast_channel_snapshot.
  */
-struct stasis_cache *ast_channel_cache_by_name(void);
+struct ao2_container *ast_channel_cache_by_name(void);
 
 /*!
  * \since 12
- * \brief Message type for \ref ast_channel_snapshot.
+ * \brief Message type for \ref ast_channel_snapshot_update.
  *
- * \retval Message type for \ref ast_channel_snapshot.
+ * \retval Message type for \ref ast_channel_snapshot_update.
  */
 struct stasis_message_type *ast_channel_snapshot_type(void);
 
@@ -176,6 +176,18 @@
 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name);
 
 /*!
+ * \since 17
+ * \brief Send the final channel snapshot for a channel, thus removing it from cache
+ *
+ * \pre chan is locked
+ *
+ * \param chan The channel to send the final channel snapshot for
+ *
+ * \note This will also remove the cached snapshot from the channel itself
+ */
+void ast_channel_publish_final_snapshot(struct ast_channel *chan);
+
+/*!
  * \since 12
  * \brief Creates a \ref ast_channel_blob message.
  *
@@ -303,6 +315,8 @@
  * \param type Type of stasis message.
  * \param blob The blob being published. (NULL if no blob)
  *
+ * \note This will use the current snapshot on the channel and will not generate a new one.
+ *
  * \return Nothing
  */
 void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type,
@@ -557,17 +571,6 @@
 		const char *dialstatus,
 		const char *forward);
 
-/*!
- * \since 12
- * \brief Publish in the \ref ast_channel_topic a \ref ast_channel_snapshot
- * message indicating a change in channel state
- *
- * \pre chan is locked
- *
- * \param chan The channel whose state has changed
- */
-void ast_publish_channel_state(struct ast_channel *chan);
-
 /*! @} */
 
 /*!
diff --git a/main/aoc.c b/main/aoc.c
index 253c745..b8cf301 100644
--- a/main/aoc.c
+++ b/main/aoc.c
@@ -1849,7 +1849,9 @@
 	}
 
 	if (chan) {
-		aoc_event->snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
+		ast_channel_lock(chan);
+		aoc_event->snapshot = ao2_bump(ast_channel_snapshot(chan));
+		ast_channel_unlock(chan);
 		if (!aoc_event->snapshot) {
 			ao2_ref(aoc_event, -1);
 			return;
diff --git a/main/app.c b/main/app.c
index 953b77d..ec74490 100644
--- a/main/app.c
+++ b/main/app.c
@@ -3244,15 +3244,7 @@
 	mwi_state->old_msgs = old_msgs;
 
 	if (!ast_strlen_zero(channel_id)) {
-		struct stasis_message *chan_message;
-
-		chan_message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
-			channel_id);
-		if (chan_message) {
-			mwi_state->snapshot = stasis_message_data(chan_message);
-			ao2_ref(mwi_state->snapshot, +1);
-		}
-		ao2_cleanup(chan_message);
+		mwi_state->snapshot = ast_channel_snapshot_get_latest(channel_id);
 	}
 
 	if (eid) {
diff --git a/main/bridge.c b/main/bridge.c
index d6e7a51..024c6ab 100644
--- a/main/bridge.c
+++ b/main/bridge.c
@@ -5150,16 +5150,15 @@
 {
 	const char *uniqueid = obj;
 	struct ast_cli_args *a = arg;
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	struct ast_channel_snapshot *snapshot;
 
-	msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid);
-	if (!msg) {
+	snapshot = ast_channel_snapshot_get_latest(uniqueid);
+	if (!snapshot) {
 		return 0;
 	}
-	snapshot = stasis_message_data(msg);
 
 	ast_cli(a->fd, "Channel: %s\n", snapshot->name);
+	ao2_ref(snapshot, -1);
 
 	return 0;
 }
diff --git a/main/cdr.c b/main/cdr.c
index 1c47e24..e321c22 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -186,14 +186,6 @@
 	</configInfo>
  ***/
 
-
-/* The prime here should be similar in size to the channel container. */
-#ifdef LOW_MEMORY
-#define NUM_CDR_BUCKETS 61
-#else
-#define NUM_CDR_BUCKETS 769
-#endif
-
 #define DEFAULT_ENABLED "1"
 #define DEFAULT_BATCHMODE "0"
 #define DEFAULT_UNANSWERED "0"
@@ -2056,9 +2048,9 @@
 
 /*!
  * \internal
- * \brief Filter a channel cache update
+ * \brief Filter a channel snapshot update
  */
-static int filter_channel_cache_message(struct ast_channel_snapshot *old_snapshot,
+static int filter_channel_snapshot_message(struct ast_channel_snapshot *old_snapshot,
 		struct ast_channel_snapshot *new_snapshot)
 {
 	int ret = 0;
@@ -2256,52 +2248,38 @@
 }
 
 /*!
- * \brief Handler for Stasis-Core channel cache update messages
+ * \brief Handler for channel snapshot update messages
  * \param data Passed on
  * \param sub The stasis subscription for this message callback
  * \param topic The topic this message was published for
  * \param message The message
  */
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+static void handle_channel_snapshot_update_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
 	struct cdr_object *cdr;
-	struct stasis_cache_update *update = stasis_message_data(message);
-	struct ast_channel_snapshot *old_snapshot;
-	struct ast_channel_snapshot *new_snapshot;
+	struct ast_channel_snapshot_update *update = stasis_message_data(message);
 	struct cdr_object *it_cdr;
 
-	ast_assert(update != NULL);
-	ast_assert(ast_channel_snapshot_type() == update->type);
-
-	old_snapshot = stasis_message_data(update->old_snapshot);
-	new_snapshot = stasis_message_data(update->new_snapshot);
-
-	if (filter_channel_cache_message(old_snapshot, new_snapshot)) {
+	if (filter_channel_snapshot_message(update->old_snapshot, update->new_snapshot)) {
 		return;
 	}
 
-	if (new_snapshot && !old_snapshot) {
-		cdr = cdr_object_alloc(new_snapshot);
+	if (update->new_snapshot && !update->old_snapshot) {
+		cdr = cdr_object_alloc(update->new_snapshot);
 		if (!cdr) {
 			return;
 		}
 		cdr->is_root = 1;
 		ao2_link(active_cdrs_master, cdr);
 	} else {
-		const char *uniqueid;
-
-		uniqueid = new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid;
-		cdr = ao2_find(active_cdrs_master, uniqueid, OBJ_SEARCH_KEY);
+		cdr = ao2_find(active_cdrs_master, update->new_snapshot->uniqueid, OBJ_SEARCH_KEY);
 	}
 
 	/* Handle Party A */
 	if (!cdr) {
-		const char *name;
-
-		name = new_snapshot ? new_snapshot->name : old_snapshot->name;
-		ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", name);
+		ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", update->new_snapshot->name);
 		ast_assert(0);
-	} else if (new_snapshot) {
+	} else {
 		int all_reject = 1;
 
 		ao2_lock(cdr);
@@ -2309,21 +2287,23 @@
 			if (!it_cdr->fn_table->process_party_a) {
 				continue;
 			}
-			all_reject &= it_cdr->fn_table->process_party_a(it_cdr, new_snapshot);
+			all_reject &= it_cdr->fn_table->process_party_a(it_cdr, update->new_snapshot);
 		}
-		if (all_reject && check_new_cdr_needed(old_snapshot, new_snapshot)) {
+		if (all_reject && check_new_cdr_needed(update->old_snapshot, update->new_snapshot)) {
 			/* We're not hung up and we have a new snapshot - we need a new CDR */
 			struct cdr_object *new_cdr;
 
 			new_cdr = cdr_object_create_and_append(cdr);
 			if (new_cdr) {
-				new_cdr->fn_table->process_party_a(new_cdr, new_snapshot);
+				new_cdr->fn_table->process_party_a(new_cdr, update->new_snapshot);
 			}
 		}
 		ao2_unlock(cdr);
-	} else {
+	}
+
+	if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
 		ao2_lock(cdr);
-		CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, old_snapshot->name);
+		CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, update->old_snapshot->name);
 		for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
 			cdr_object_finalize(it_cdr);
 		}
@@ -2335,12 +2315,14 @@
 	}
 
 	/* Handle Party B */
-	if (new_snapshot) {
+	if (update->new_snapshot) {
 		ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
-			cdr_object_update_party_b, (char *) new_snapshot->name, new_snapshot);
-	} else {
+			cdr_object_update_party_b, (char *) update->new_snapshot->name, update->new_snapshot);
+	}
+
+	if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
 		ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
-			cdr_object_finalize_party_b, (char *) old_snapshot->name, old_snapshot);
+			cdr_object_finalize_party_b, (char *) update->new_snapshot->name, update->new_snapshot);
 	}
 
 	ao2_cleanup(cdr);
@@ -4302,7 +4284,7 @@
 		return 0;
 	}
 
-	channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
+	channel_subscription = stasis_forward_all(ast_channel_topic_all(), cdr_topic);
 	if (!channel_subscription) {
 		return -1;
 	}
@@ -4522,7 +4504,7 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+	stasis_message_router_add(stasis_router, ast_channel_snapshot_type(), handle_channel_snapshot_update_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
 	stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
@@ -4530,14 +4512,14 @@
 	stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL);
 
 	active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
-		NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
+		AST_NUM_CHANNEL_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
 	if (!active_cdrs_master) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
 	ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn);
 
 	active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
-		NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
+		AST_NUM_CHANNEL_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
 	if (!active_cdrs_all) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
diff --git a/main/cel.c b/main/cel.c
index feb3bee..97e35ad 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -888,14 +888,6 @@
 {
 	int is_hungup, was_hungup;
 
-	if (!new_snapshot) {
-		cel_report_event(old_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
-		if (ast_cel_track_event(AST_CEL_LINKEDID_END)) {
-			check_retire_linkedid(old_snapshot);
-		}
-		return;
-	}
-
 	if (!old_snapshot) {
 		cel_report_event(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL, NULL);
 		return;
@@ -915,6 +907,11 @@
 		cel_report_event(new_snapshot, AST_CEL_HANGUP, NULL, extra, NULL);
 		ast_json_unref(extra);
 		ao2_cleanup(dialstatus);
+
+		cel_report_event(new_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
+		if (ast_cel_track_event(AST_CEL_LINKEDID_END)) {
+			check_retire_linkedid(new_snapshot);
+		}
 		return;
 	}
 
@@ -928,7 +925,7 @@
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
 {
-	if (!old_snapshot || !new_snapshot) {
+	if (!old_snapshot) {
 		return;
 	}
 
@@ -946,8 +943,7 @@
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
 {
-	if (new_snapshot && old_snapshot
-		&& !strcmp(old_snapshot->appl, new_snapshot->appl)) {
+	if (old_snapshot && !strcmp(old_snapshot->appl, new_snapshot->appl)) {
 		return;
 	}
 
@@ -957,7 +953,7 @@
 	}
 
 	/* new snapshot has an application, start it */
-	if (new_snapshot && !ast_strlen_zero(new_snapshot->appl)) {
+	if (!ast_strlen_zero(new_snapshot->appl)) {
 		cel_report_event(new_snapshot, AST_CEL_APP_START, NULL, NULL, NULL);
 	}
 }
@@ -984,22 +980,15 @@
 static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
 	struct stasis_message *message)
 {
-	struct stasis_cache_update *update = stasis_message_data(message);
-	if (ast_channel_snapshot_type() == update->type) {
-		struct ast_channel_snapshot *old_snapshot;
-		struct ast_channel_snapshot *new_snapshot;
-		size_t i;
+	struct ast_channel_snapshot_update *update = stasis_message_data(message);
+	size_t i;
 
-		old_snapshot = stasis_message_data(update->old_snapshot);
-		new_snapshot = stasis_message_data(update->new_snapshot);
+	if (cel_filter_channel_snapshot(update->old_snapshot) || cel_filter_channel_snapshot(update->new_snapshot)) {
+		return;
+	}
 
-		if (cel_filter_channel_snapshot(old_snapshot) || cel_filter_channel_snapshot(new_snapshot)) {
-			return;
-		}
-
-		for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) {
-			cel_channel_monitors[i](old_snapshot, new_snapshot);
-		}
+	for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) {
+		cel_channel_monitors[i](update->old_snapshot, update->new_snapshot);
 	}
 }
 
@@ -1453,7 +1442,7 @@
 	}
 
 	cel_channel_forwarder = stasis_forward_all(
-		ast_channel_topic_all_cached(),
+		ast_channel_topic_all(),
 		cel_aggregation_topic);
 	if (!cel_channel_forwarder) {
 		return -1;
@@ -1498,7 +1487,7 @@
 		6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
 
 	ret |= stasis_message_router_add(cel_state_router,
-		stasis_cache_update_type(),
+		ast_channel_snapshot_type(),
 		cel_snapshot_update_cb,
 		NULL);
 
diff --git a/main/channel.c b/main/channel.c
index 6c6e9f7..3d8e244 100644
--- a/main/channel.c
+++ b/main/channel.c
@@ -116,12 +116,6 @@
 /*! \brief the list of registered channel types */
 static AST_RWLIST_HEAD_STATIC(backends, chanlist);
 
-#ifdef LOW_MEMORY
-#define NUM_CHANNEL_BUCKETS 61
-#else
-#define NUM_CHANNEL_BUCKETS 1567
-#endif
-
 /*! \brief All active channels on the system */
 static struct ao2_container *channels;
 
@@ -635,38 +629,6 @@
 	return -1;
 }
 
-static struct stasis_message *create_channel_snapshot_message(struct ast_channel *channel)
-{
-	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
-
-	if (!ast_channel_snapshot_type()) {
-		return NULL;
-	}
-
-	ast_channel_lock(channel);
-	snapshot = ast_channel_snapshot_create(channel);
-	ast_channel_unlock(channel);
-	if (!snapshot) {
-		return NULL;
-	}
-
-	return stasis_message_create(ast_channel_snapshot_type(), snapshot);
-}
-
-static void publish_cache_clear(struct ast_channel *chan)
-{
-	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
-
-	clear_msg = create_channel_snapshot_message(chan);
-	if (!clear_msg) {
-		return;
-	}
-
-	message = stasis_cache_clear_create(clear_msg);
-	stasis_publish(ast_channel_topic(chan), message);
-}
-
 /*! \brief Gives the string form of a given channel state.
  *
  * \note This function is not reentrant.
@@ -1236,7 +1198,9 @@
 				     "musicclass", musicclass);
 	}
 
-	ast_channel_publish_cached_blob(chan, ast_channel_hold_type(), blob);
+	ast_channel_lock(chan);
+	ast_channel_publish_blob(chan, ast_channel_hold_type(), blob);
+	ast_channel_unlock(chan);
 
 	res = ast_queue_frame(chan, &f);
 
@@ -1250,7 +1214,9 @@
 	struct ast_frame f = { AST_FRAME_CONTROL, .subclass.integer = AST_CONTROL_UNHOLD };
 	int res;
 
-	ast_channel_publish_cached_blob(chan, ast_channel_unhold_type(), NULL);
+	ast_channel_lock(chan);
+	ast_channel_publish_blob(chan, ast_channel_unhold_type(), NULL);
+	ast_channel_unlock(chan);
 
 	res = ast_queue_frame(chan, &f);
 
@@ -2230,9 +2196,8 @@
 		ast_assert(!ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE));
 
 		ast_channel_lock(chan);
-		ast_channel_publish_snapshot(chan);
+		ast_channel_publish_final_snapshot(chan);
 		ast_channel_unlock(chan);
-		publish_cache_clear(chan);
 	}
 
 	ast_channel_lock(chan);
@@ -3344,7 +3309,7 @@
 		return;
 	}
 
-	ast_channel_publish_cached_blob(chan, ast_channel_dtmf_begin_type(), blob);
+	ast_channel_publish_blob(chan, ast_channel_dtmf_begin_type(), blob);
 }
 
 static void send_dtmf_end_event(struct ast_channel *chan,
@@ -3361,7 +3326,7 @@
 		return;
 	}
 
-	ast_channel_publish_cached_blob(chan, ast_channel_dtmf_end_type(), blob);
+	ast_channel_publish_blob(chan, ast_channel_dtmf_end_type(), blob);
 }
 
 static void ast_read_generator_actions(struct ast_channel *chan, struct ast_frame *f)
@@ -6819,6 +6784,9 @@
 	/* Make sure the Stasis topic on the channel is updated appropriately */
 	ast_channel_internal_swap_topics(clonechan, original);
 
+	/* The old snapshots need to follow the channels so the snapshot update is correct */
+	ast_channel_internal_swap_snapshots(clonechan, original);
+
 	/* Swap channel names. This uses ast_channel_name_set directly, so we
 	 * don't get any spurious rename events.
 	 */
@@ -7246,7 +7214,7 @@
 
 	ast_channel_state_set(chan, state);
 
-	ast_publish_channel_state(chan);
+	ast_channel_publish_snapshot(chan);
 
 	/* We have to pass AST_DEVICE_UNKNOWN here because it is entirely possible that the channel driver
 	 * for this channel is using the callback method for device state. If we pass in an actual state here
@@ -7856,7 +7824,7 @@
 
 int ast_channels_init(void)
 {
-	channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, NUM_CHANNEL_BUCKETS,
+	channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, AST_NUM_CHANNEL_BUCKETS,
 		ast_channel_hash_cb, NULL, ast_channel_cmp_cb);
 	if (!channels) {
 		return -1;
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index a963a7d..436ba23 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -42,7 +42,6 @@
 #include "asterisk/channel_internal.h"
 #include "asterisk/endpoints.h"
 #include "asterisk/indications.h"
-#include "asterisk/stasis_cache_pattern.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stringfields.h"
@@ -215,12 +214,13 @@
 	char dtmf_digit_to_emulate;			/*!< Digit being emulated */
 	char sending_dtmf_digit;			/*!< Digit this channel is currently sending out. (zero if not sending) */
 	struct timeval sending_dtmf_tv;		/*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
-	struct stasis_cp_single *topics;		/*!< Topic for all channel's events */
+	struct stasis_topic *topic;		/*!< Topic for trhis channel */
+	struct stasis_forward *channel_forward; /*!< Subscription for event forwarding to all channel topic */
 	struct stasis_forward *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
-	struct stasis_forward *endpoint_cache_forward; /*!< Subscription for cache updates to endpoint's topic */
 	struct ast_stream_topology *stream_topology; /*!< Stream topology */
 	void *stream_topology_change_source; /*!< Source that initiated a stream topology change */
 	struct ast_stream *default_streams[AST_MEDIA_TYPE_END]; /*!< Default streams indexed by media type */
+	struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */
 };
 
 /*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1381,11 +1381,25 @@
 
 void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b)
 {
-	struct stasis_cp_single *temp;
+	struct stasis_topic *topic;
+	struct stasis_forward *forward;
 
-	temp = a->topics;
-	a->topics = b->topics;
-	b->topics = temp;
+	topic = a->topic;
+	a->topic = b->topic;
+	b->topic = topic;
+
+	forward = a->channel_forward;
+	a->channel_forward = b->channel_forward;
+	b->channel_forward = forward;
+}
+
+void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b)
+{
+	struct ast_channel_snapshot *snapshot;
+
+	snapshot = a->snapshot;
+	a->snapshot = b->snapshot;
+	b->snapshot = snapshot;
 }
 
 void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid)
@@ -1404,11 +1418,11 @@
 
 	ast_string_field_free_memory(chan);
 
+	chan->channel_forward = stasis_forward_cancel(chan->channel_forward);
 	chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
-	chan->endpoint_cache_forward = stasis_forward_cancel(chan->endpoint_cache_forward);
 
-	stasis_cp_single_unsubscribe(chan->topics);
-	chan->topics = NULL;
+	ao2_cleanup(chan->topic);
+	chan->topic = NULL;
 
 	ast_channel_internal_set_stream_topology(chan, NULL);
 
@@ -1431,16 +1445,7 @@
 		return ast_channel_topic_all();
 	}
 
-	return stasis_cp_single_topic(chan->topics);
-}
-
-struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan)
-{
-	if (!chan) {
-		return ast_channel_topic_all_cached();
-	}
-
-	return stasis_cp_single_topic_cached(chan->topics);
+	return chan->topic;
 }
 
 int ast_channel_forward_endpoint(struct ast_channel *chan,
@@ -1456,28 +1461,28 @@
 		return -1;
 	}
 
-	chan->endpoint_cache_forward = stasis_forward_all(ast_channel_topic_cached(chan),
-		ast_endpoint_topic(endpoint));
-	if (!chan->endpoint_cache_forward) {
-		chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
-		return -1;
-	}
-
 	return 0;
 }
 
 int ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
 	const char *topic_name = chan->uniqueid.unique_id;
-	ast_assert(chan->topics == NULL);
+	ast_assert(chan->topic == NULL);
 
 	if (ast_strlen_zero(topic_name)) {
 		topic_name = "<dummy-channel>";
 	}
 
-	chan->topics = stasis_cp_single_create(
-		ast_channel_cache_all(), topic_name);
-	if (!chan->topics) {
+	chan->topic = stasis_topic_create(topic_name);
+	if (!chan->topic) {
+		return -1;
+	}
+
+	chan->channel_forward = stasis_forward_all(ast_channel_topic(chan),
+		ast_channel_topic_all());
+	if (!chan->channel_forward) {
+		ao2_ref(chan->topic, -1);
+		chan->topic = NULL;
 		return -1;
 	}
 
@@ -1568,3 +1573,14 @@
 {
 	return (chan->tech && chan->tech->read_stream && chan->tech->write_stream);
 }
+
+struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan)
+{
+	return chan->snapshot;
+}
+
+void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot)
+{
+	ao2_cleanup(chan->snapshot);
+	chan->snapshot = ao2_bump(snapshot);
+}
diff --git a/main/cli.c b/main/cli.c
index cf51d0d..5484e47 100644
--- a/main/cli.c
+++ b/main/cli.c
@@ -956,7 +956,7 @@
 
 	struct ao2_container *channels;
 	struct ao2_iterator it_chans;
-	struct stasis_message *msg;
+	struct ast_channel_snapshot *cs;
 	int numchans = 0, concise = 0, verbose = 0, count = 0;
 
 	switch (cmd) {
@@ -989,11 +989,7 @@
 	} else if (a->argc != e->args - 1)
 		return CLI_SHOWUSAGE;
 
-
-	if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
-		ast_cli(a->fd, "Failed to retrieve cached channels\n");
-		return CLI_SUCCESS;
-	}
+	channels = ast_channel_cache_by_name();
 
 	if (!count) {
 		if (!concise && !verbose)
@@ -1004,8 +1000,7 @@
 	}
 
 	it_chans = ao2_iterator_init(channels, 0);
-	for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) {
-		struct ast_channel_snapshot *cs = stasis_message_data(msg);
+	for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) {
 		char durbuf[16] = "-";
 
 		if (!count) {
@@ -1679,29 +1674,25 @@
 	struct ao2_container *cached_channels;
 	char *ret = NULL;
 	struct ao2_iterator iter;
-	struct stasis_message *msg;
+	struct ast_channel_snapshot *snapshot;
 
 	if (pos != rpos) {
 		return NULL;
 	}
 
-	if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) {
-		return NULL;
-	}
+	cached_channels = ast_channel_cache_all();
 
 	iter = ao2_iterator_init(cached_channels, 0);
-	for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) {
-		struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
-
+	for (; (snapshot = ao2_iterator_next(&iter)); ao2_ref(snapshot, -1)) {
 		if (!strncasecmp(word, snapshot->name, wordlen) && (++which > state)) {
 			if (state != -1) {
 				ret = ast_strdup(snapshot->name);
-				ao2_ref(msg, -1);
+				ao2_ref(snapshot, -1);
 				break;
 			}
 
 			if (ast_cli_completion_add(ast_strdup(snapshot->name))) {
-				ao2_ref(msg, -1);
+				ao2_ref(snapshot, -1);
 				break;
 			}
 		}
diff --git a/main/endpoints.c b/main/endpoints.c
index 030e26c..f3e3372 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -179,25 +179,23 @@
 	return 0;
 }
 
-/*! \brief Handler for channel snapshot cache clears */
+/*! \brief Handler for channel snapshot update */
 static void endpoint_cache_clear(void *data,
 	struct stasis_subscription *sub,
 	struct stasis_message *message)
 {
 	struct ast_endpoint *endpoint = data;
-	struct stasis_message *clear_msg = stasis_message_data(message);
-	struct ast_channel_snapshot *clear_snapshot;
+	struct ast_channel_snapshot_update *update = stasis_message_data(message);
 
-	if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
+	/* Only when the channel is dead do we remove it */
+	if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
 		return;
 	}
 
-	clear_snapshot = stasis_message_data(clear_msg);
-
 	ast_assert(endpoint != NULL);
 
 	ao2_lock(endpoint);
-	ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
+	ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->uniqueid);
 	ao2_unlock(endpoint);
 	endpoint_publish_snapshot(endpoint);
 }
@@ -271,7 +269,7 @@
 			return NULL;
 		}
 		r |= stasis_message_router_add(endpoint->router,
-			stasis_cache_clear_type(), endpoint_cache_clear,
+			ast_channel_snapshot_type(), endpoint_cache_clear,
 			endpoint);
 		r |= stasis_message_router_add(endpoint->router,
 			stasis_subscription_change_type(), endpoint_subscription_change,
diff --git a/main/manager.c b/main/manager.c
index 0469a73..76a827c 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -6250,7 +6250,7 @@
 	int numchans = 0;
 	struct ao2_container *channels;
 	struct ao2_iterator it_chans;
-	struct stasis_message *msg;
+	struct ast_channel_snapshot *cs;
 
 	if (!ast_strlen_zero(actionid)) {
 		snprintf(idText, sizeof(idText), "ActionID: %s\r\n", actionid);
@@ -6258,17 +6258,12 @@
 		idText[0] = '\0';
 	}
 
-	channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type());
-	if (!channels) {
-		astman_send_error(s, m, "Could not get cached channels");
-		return 0;
-	}
+	channels = ast_channel_cache_by_name();
 
 	astman_send_listack(s, m, "Channels will follow", "start");
 
 	it_chans = ao2_iterator_init(channels, 0);
-	for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) {
-		struct ast_channel_snapshot *cs = stasis_message_data(msg);
+	for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) {
 		struct ast_str *built = ast_manager_build_channel_state_string_prefix(cs, "");
 		char durbuf[16] = "";
 
diff --git a/main/manager_bridges.c b/main/manager_bridges.c
index b7059f4..1b57049 100644
--- a/main/manager_bridges.c
+++ b/main/manager_bridges.c
@@ -528,17 +528,14 @@
 	char *uniqueid = obj;
 	struct mansession *s = arg;
 	struct bridge_list_data *list_data = data;
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-	struct ast_channel_snapshot *snapshot;
+	RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_str *, channel_text, NULL, ast_free);
 
-	msg = stasis_cache_get(ast_channel_cache(),
-		ast_channel_snapshot_type(), uniqueid);
-	if (!msg) {
+	snapshot = ast_channel_snapshot_get_latest(uniqueid);
+	if (!snapshot) {
 		return 0;
 	}
 
-	snapshot = stasis_message_data(msg);
 	if (snapshot->tech_properties & AST_CHAN_TP_INTERNAL) {
 		return 0;
 	}
diff --git a/main/manager_channels.c b/main/manager_channels.c
index ac09d42..887f77e 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -576,11 +576,6 @@
 {
 	int is_hungup, was_hungup;
 
-	if (!new_snapshot) {
-		/* Ignore cache clearing events; we'll see the hangup first */
-		return NULL;
-	}
-
 	/* The Newchannel, Newstate and Hangup events are closely related, in
 	 * in that they are mutually exclusive, basically different flavors
 	 * of a new channel state event.
@@ -616,11 +611,6 @@
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
 {
-	/* No Newexten event on cache clear */
-	if (!new_snapshot) {
-		return NULL;
-	}
-
 	/* Empty application is not valid for a Newexten event */
 	if (ast_strlen_zero(new_snapshot->appl)) {
 		return NULL;
@@ -654,8 +644,8 @@
 	struct ast_manager_event_blob *res;
 	char *callerid;
 
-	/* No NewCallerid event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
+	/* No NewCallerid event on first channel snapshot */
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -682,8 +672,8 @@
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
 {
-	/* No NewConnectedLine event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
+	/* No NewConnectedLine event on first channel snapshot */
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -699,7 +689,7 @@
 	struct ast_channel_snapshot *old_snapshot,
 	struct ast_channel_snapshot *new_snapshot)
 {
-	if (!old_snapshot || !new_snapshot) {
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -724,21 +714,14 @@
 				    struct stasis_message *message)
 {
 	RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
-	struct stasis_cache_update *update;
-	struct ast_channel_snapshot *old_snapshot;
-	struct ast_channel_snapshot *new_snapshot;
+	struct ast_channel_snapshot_update *update;
 	size_t i;
 
 	update = stasis_message_data(message);
 
-	ast_assert(ast_channel_snapshot_type() == update->type);
-
-	old_snapshot = stasis_message_data(update->old_snapshot);
-	new_snapshot = stasis_message_data(update->new_snapshot);
-
 	for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
 		RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
-		ev = channel_monitors[i](old_snapshot, new_snapshot);
+		ev = channel_monitors[i](update->old_snapshot, update->new_snapshot);
 
 		if (!ev) {
 			continue;
@@ -747,7 +730,7 @@
 		/* If we haven't already, build the channel event string */
 		if (!channel_event_string) {
 			channel_event_string =
-				ast_manager_build_channel_state_string(new_snapshot);
+				ast_manager_build_channel_state_string(update->new_snapshot);
 			if (!channel_event_string) {
 				return;
 			}
@@ -1260,7 +1243,7 @@
 	if (!message_router) {
 		return -1;
 	}
-	channel_topic = ast_channel_topic_all_cached();
+	channel_topic = ast_channel_topic_all();
 	if (!channel_topic) {
 		return -1;
 	}
@@ -1272,7 +1255,7 @@
 
 	ast_register_cleanup(manager_channels_shutdown);
 
-	ret |= stasis_message_router_add_cache_update(message_router,
+	ret |= stasis_message_router_add(message_router,
 		ast_channel_snapshot_type(), channel_snapshot_update, NULL);
 
 	ret |= stasis_message_router_add(message_router,
diff --git a/main/stasis_channels.c b/main/stasis_channels.c
index 0da8005..ec8d70c 100644
--- a/main/stasis_channels.c
+++ b/main/stasis_channels.c
@@ -36,7 +36,6 @@
 #include "asterisk/bridge.h"
 #include "asterisk/translate.h"
 #include "asterisk/stasis.h"
-#include "asterisk/stasis_cache_pattern.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/dial.h"
 #include "asterisk/linkedlists.h"
@@ -117,53 +116,23 @@
 
 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
 
-static struct stasis_cp_all *channel_cache_all;
-static struct stasis_cache *channel_cache_by_name;
-static struct stasis_caching_topic *channel_by_name_topic;
+static struct stasis_topic *channel_topic_all;
+static struct ao2_container *channel_cache;
+static struct ao2_container *channel_cache_by_name;
 
-struct stasis_cp_all *ast_channel_cache_all(void)
+struct ao2_container *ast_channel_cache_all(void)
 {
-	return channel_cache_all;
-}
-
-struct stasis_cache *ast_channel_cache(void)
-{
-	return stasis_cp_all_cache(channel_cache_all);
+	return ao2_bump(channel_cache);
 }
 
 struct stasis_topic *ast_channel_topic_all(void)
 {
-	return stasis_cp_all_topic(channel_cache_all);
+	return channel_topic_all;
 }
 
-struct stasis_topic *ast_channel_topic_all_cached(void)
+struct ao2_container *ast_channel_cache_by_name(void)
 {
-	return stasis_cp_all_topic_cached(channel_cache_all);
-}
-
-struct stasis_cache *ast_channel_cache_by_name(void)
-{
-	return channel_cache_by_name;
-}
-
-static const char *channel_snapshot_get_id(struct stasis_message *message)
-{
-	struct ast_channel_snapshot *snapshot;
-	if (ast_channel_snapshot_type() != stasis_message_type(message)) {
-		return NULL;
-	}
-	snapshot = stasis_message_data(message);
-	return snapshot->uniqueid;
-}
-
-static const char *channel_snapshot_get_name(struct stasis_message *message)
-{
-	struct ast_channel_snapshot *snapshot;
-	if (ast_channel_snapshot_type() != stasis_message_type(message)) {
-		return NULL;
-	}
-	snapshot = stasis_message_data(message);
-	return snapshot->name;
+	return ao2_bump(channel_cache_by_name);
 }
 
 /*!
@@ -219,6 +188,59 @@
 	return CMP_MATCH;
 }
 
+/*!
+ * \internal
+ * \brief Hash function (using uniqueid) for \ref ast_channel_snapshot objects
+ */
+static int channel_snapshot_uniqueid_hash_cb(const void *obj, const int flags)
+{
+	const struct ast_channel_snapshot *object = obj;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		key = object->uniqueid;
+		break;
+	default:
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_case_hash(key);
+}
+
+/*!
+ * \internal
+ * \brief Comparison function (using uniqueid) for \ref ast_channel_snapshot objects
+ */
+static int channel_snapshot_uniqueid_cmp_cb(void *obj, void *arg, int flags)
+{
+	const struct ast_channel_snapshot *object_left = obj;
+	const struct ast_channel_snapshot *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->uniqueid;
+	case OBJ_SEARCH_KEY:
+		cmp = strcasecmp(object_left->uniqueid, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		cmp = strncasecmp(object_left->uniqueid, right_key, strlen(right_key));
+		break;
+	default:
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	return CMP_MATCH;
+}
+
 static void channel_snapshot_dtor(void *obj)
 {
 	struct ast_channel_snapshot *snapshot = obj;
@@ -309,6 +331,34 @@
 	return snapshot;
 }
 
+static void channel_snapshot_update_dtor(void *obj)
+{
+	struct ast_channel_snapshot_update *update = obj;
+
+	ao2_cleanup(update->old_snapshot);
+	ao2_cleanup(update->new_snapshot);
+}
+
+static struct ast_channel_snapshot_update *channel_snapshot_update_create(struct ast_channel *chan)
+{
+	struct ast_channel_snapshot_update *update;
+
+	update = ao2_alloc_options(sizeof(*update), channel_snapshot_update_dtor,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!update) {
+		return NULL;
+	}
+
+	update->old_snapshot = ao2_bump(ast_channel_snapshot(chan));
+	update->new_snapshot = ast_channel_snapshot_create(chan);
+	if (!update->new_snapshot) {
+		ao2_ref(update, -1);
+		return NULL;
+	}
+
+	return update;
+}
+
 static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
 {
 	if (chan) {
@@ -521,7 +571,7 @@
 		return NULL;
 	}
 
-	snapshot = chan ? ast_channel_snapshot_create(chan) : NULL;
+	snapshot = chan ? ao2_bump(ast_channel_snapshot(chan)) : NULL;
 	msg = create_channel_blob_message(snapshot, type, blob);
 	ao2_cleanup(snapshot);
 	return msg;
@@ -628,38 +678,48 @@
 
 struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
 {
-	struct stasis_message *message;
-	struct ast_channel_snapshot *snapshot;
-
 	ast_assert(!ast_strlen_zero(uniqueid));
 
-	message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
-		uniqueid);
-	if (!message) {
-		return NULL;
-	}
-
-	snapshot = ao2_bump(stasis_message_data(message));
-	ao2_ref(message, -1);
-	return snapshot;
+	return ao2_find(channel_cache, uniqueid, OBJ_SEARCH_KEY);
 }
 
 struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
 {
-	struct stasis_message *message;
-	struct ast_channel_snapshot *snapshot;
-
 	ast_assert(!ast_strlen_zero(name));
 
-	message = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(),
-		name);
-	if (!message) {
-		return NULL;
+	return ao2_find(channel_cache_by_name, name, OBJ_SEARCH_KEY);
+}
+
+void ast_channel_publish_final_snapshot(struct ast_channel *chan)
+{
+	struct ast_channel_snapshot_update *update;
+	struct stasis_message *message;
+
+	if (!ast_channel_snapshot_type()) {
+		return;
 	}
 
-	snapshot = ao2_bump(stasis_message_data(message));
+	update = channel_snapshot_update_create(chan);
+	if (!update) {
+		return;
+	}
+
+	message = stasis_message_create(ast_channel_snapshot_type(), update);
+	/* In the success path message holds a reference to update so it will be valid
+	 * for the lifetime of this function until the end.
+	 */
+	ao2_ref(update, -1);
+	if (!message) {
+		return;
+	}
+
+	ao2_unlink(channel_cache, update->old_snapshot);
+	ao2_unlink(channel_cache_by_name, update->old_snapshot);
+
+	ast_channel_snapshot_set(chan, NULL);
+
+	stasis_publish(ast_channel_topic(chan), message);
 	ao2_ref(message, -1);
-	return snapshot;
 }
 
 static void channel_role_snapshot_dtor(void *obj)
@@ -764,7 +824,7 @@
 
 void ast_channel_publish_snapshot(struct ast_channel *chan)
 {
-	struct ast_channel_snapshot *snapshot;
+	struct ast_channel_snapshot_update *update;
 	struct stasis_message *message;
 
 	if (!ast_channel_snapshot_type()) {
@@ -775,17 +835,40 @@
 		return;
 	}
 
-	snapshot = ast_channel_snapshot_create(chan);
-	if (!snapshot) {
+	update = channel_snapshot_update_create(chan);
+	if (!update) {
 		return;
 	}
 
-	message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
-	ao2_ref(snapshot, -1);
+	message = stasis_message_create(ast_channel_snapshot_type(), update);
+	/* In the success path message holds a reference to update so it will be valid
+	 * for the lifetime of this function until the end.
+	 */
+	ao2_ref(update, -1);
 	if (!message) {
 		return;
 	}
 
+	/* We lock these ourselves so that the update is atomic and there isn't time where a
+	 * snapshot is not in the cache.
+	 */
+	ao2_wrlock(channel_cache);
+	if (update->old_snapshot) {
+		ao2_unlink_flags(channel_cache, update->old_snapshot, OBJ_NOLOCK);
+	}
+	ao2_link_flags(channel_cache, update->new_snapshot, OBJ_NOLOCK);
+	ao2_unlock(channel_cache);
+
+	/* The same applies here. */
+	ao2_wrlock(channel_cache_by_name);
+	if (update->old_snapshot) {
+		ao2_unlink_flags(channel_cache_by_name, update->old_snapshot, OBJ_NOLOCK);
+	}
+	ao2_link_flags(channel_cache_by_name, update->new_snapshot, OBJ_NOLOCK);
+	ao2_unlock(channel_cache_by_name);
+
+	ast_channel_snapshot_set(chan, update->new_snapshot);
+
 	ast_assert(ast_channel_topic(chan) != NULL);
 	stasis_publish(ast_channel_topic(chan), message);
 	ao2_ref(message, -1);
@@ -841,13 +924,8 @@
 		ast_channel_publish_snapshot(chan);
 	}
 
-	if (chan) {
-		ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob);
-	} else {
-		/* This function is NULL safe for global variables */
-		ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob);
-	}
-
+	/* This function is NULL safe for global variables */
+	ast_channel_publish_blob(chan, ast_channel_varset_type(), blob);
 	ast_json_unref(blob);
 }
 
@@ -931,36 +1009,6 @@
 	return ev;
 }
 
-void ast_publish_channel_state(struct ast_channel *chan)
-{
-	struct ast_channel_snapshot *snapshot;
-	struct stasis_message *message;
-
-	if (!ast_channel_snapshot_type()) {
-		return;
-	}
-
-	ast_assert(chan != NULL);
-	if (!chan) {
-		return;
-	}
-
-	snapshot = ast_channel_snapshot_create(chan);
-	if (!snapshot) {
-		return;
-	}
-
-	message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
-	ao2_ref(snapshot, -1);
-	if (!message) {
-		return;
-	}
-
-	ast_assert(ast_channel_topic(chan) != NULL);
-	stasis_publish(ast_channel_topic(chan), message);
-	ao2_ref(message, -1);
-}
-
 struct ast_json *ast_channel_snapshot_to_json(
 	const struct ast_channel_snapshot *snapshot,
 	const struct stasis_message_sanitizer *sanitize)
@@ -1332,12 +1380,12 @@
 
 static void stasis_channels_cleanup(void)
 {
-	stasis_caching_unsubscribe_and_join(channel_by_name_topic);
-	channel_by_name_topic = NULL;
+	ao2_cleanup(channel_topic_all);
+	channel_topic_all = NULL;
+	ao2_cleanup(channel_cache);
+	channel_cache = NULL;
 	ao2_cleanup(channel_cache_by_name);
 	channel_cache_by_name = NULL;
-	ao2_cleanup(channel_cache_all);
-	channel_cache_all = NULL;
 
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
@@ -1367,29 +1415,28 @@
 
 	ast_register_cleanup(stasis_channels_cleanup);
 
-	channel_cache_all = stasis_cp_all_create("ast_channel_topic_all",
-		channel_snapshot_get_id);
-	if (!channel_cache_all) {
+	channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+	if (!channel_topic_all) {
 		return -1;
 	}
-	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
-	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
 
-	channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name);
+	channel_cache = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
+		0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_uniqueid_hash_cb,
+		NULL, channel_snapshot_uniqueid_cmp_cb);
+	if (!channel_cache) {
+		return -1;
+	}
+
+	channel_cache_by_name = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
+		0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_hash_cb,
+		NULL, channel_snapshot_cmp_cb);
 	if (!channel_cache_by_name) {
 		return -1;
 	}
 
-	/* This should be initialized before the caching topic */
+	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
+	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
 	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
-
-	channel_by_name_topic = stasis_caching_topic_create(
-		stasis_cp_all_topic(channel_cache_all),
-		channel_cache_by_name);
-	if (!channel_by_name_topic) {
-		return -1;
-	}
-
 	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
 	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
 	res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c
index bca32f1..f96192b 100644
--- a/res/ari/resource_channels.c
+++ b/res/ari/resource_channels.c
@@ -833,32 +833,19 @@
 	struct ast_ari_channels_get_args *args,
 	struct ast_ari_response *response)
 {
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-	struct stasis_cache *cache;
 	struct ast_channel_snapshot *snapshot;
 
-	cache = ast_channel_cache();
-	if (!cache) {
-		ast_ari_response_error(
-			response, 500, "Internal Server Error",
-			"Message bus not initialized");
-		return;
-	}
-
-	msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
-				   args->channel_id);
-	if (!msg) {
+	snapshot = ast_channel_snapshot_get_latest(args->channel_id);
+	if (!snapshot) {
 		ast_ari_response_error(
 			response, 404, "Not Found",
 			"Channel not found");
 		return;
 	}
 
-	snapshot = stasis_message_data(msg);
-	ast_assert(snapshot != NULL);
-
 	ast_ari_response_ok(response,
 				ast_channel_snapshot_to_json(snapshot, NULL));
+	ao2_ref(snapshot, -1);
 }
 
 void ast_ari_channels_hangup(struct ast_variable *headers,
@@ -903,27 +890,13 @@
 	struct ast_ari_channels_list_args *args,
 	struct ast_ari_response *response)
 {
-	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
 	RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
 	struct ao2_iterator i;
 	void *obj;
 	struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
 
-	cache = ast_channel_cache();
-	if (!cache) {
-		ast_ari_response_error(
-			response, 500, "Internal Server Error",
-			"Message bus not initialized");
-		return;
-	}
-	ao2_ref(cache, +1);
-
-	snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type());
-	if (!snapshots) {
-		ast_ari_response_alloc_failed(response);
-		return;
-	}
+	snapshots = ast_channel_cache_all();
 
 	json = ast_json_array_create();
 	if (!json) {
@@ -933,12 +906,12 @@
 
 	i = ao2_iterator_init(snapshots, 0);
 	while ((obj = ao2_iterator_next(&i))) {
-		RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
-		struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
+		struct ast_channel_snapshot *snapshot = obj;
 		int r;
 
 		if (sanitize && sanitize->channel_snapshot
 			&& sanitize->channel_snapshot(snapshot)) {
+			ao2_ref(snapshot, -1);
 			continue;
 		}
 
@@ -947,8 +920,10 @@
 		if (r != 0) {
 			ast_ari_response_alloc_failed(response);
 			ao2_iterator_destroy(&i);
+			ao2_ref(snapshot, -1);
 			return;
 		}
+		ao2_ref(snapshot, -1);
 	}
 	ao2_iterator_destroy(&i);
 
diff --git a/res/res_agi.c b/res/res_agi.c
index 0931c1a..e322d7f 100644
--- a/res/res_agi.c
+++ b/res/res_agi.c
@@ -3182,13 +3182,13 @@
 		ast_agi_send(agi->fd, chan, "200 result=%u\n", ast_channel_state(chan));
 		return RESULT_SUCCESS;
 	} else if (argc == 3) {
-		RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+		struct ast_channel_snapshot *snapshot;
 
 		/* one argument: look for info on the specified channel */
-		if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) {
-			struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
-
+		snapshot = ast_channel_snapshot_get_latest_by_name(argv[2]);
+		if (snapshot) {
 			ast_agi_send(agi->fd, chan, "200 result=%u\n", snapshot->state);
+			ao2_ref(snapshot, -1);
 			return RESULT_SUCCESS;
 		}
 		/* if we get this far no channel name matched the argument given */
diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c
index dbc79f0..bed95a0 100644
--- a/res/res_chan_stats.c
+++ b/res/res_chan_stats.c
@@ -78,7 +78,7 @@
 }
 
 /*!
- * \brief Router callback for \ref stasis_cache_update messages.
+ * \brief Router callback for \ref ast_channel_snapshot_update messages.
  * \param data Data pointer given when added to router.
  * \param sub This subscription.
  * \param topic The topic the message was posted to. This is not necessarily the
@@ -92,34 +92,25 @@
 	/* Since this came from a message router, we know the type of the
 	 * message. We can cast the data without checking its type.
 	 */
-	struct stasis_cache_update *update = stasis_message_data(message);
+	struct ast_channel_snapshot_update *update = stasis_message_data(message);
 
-	/* We're only interested in channel snapshots, so check the type
-	 * of the underlying message.
-	 */
-	if (ast_channel_snapshot_type() != update->type) {
-		return;
-	}
-
-	/* There are three types of cache updates.
-	 * !old && new -> Initial cache entry
-	 * old && new -> Updated cache entry
-	 * old && !new -> Cache entry removed.
+	/* There are three types of channel snapshot updates.
+	 * !old && new -> Initial channel creation
+	 * old && new -> Updated channel snapshot
+	 * old && dead -> Final channel snapshot
 	 */
 
 	if (!update->old_snapshot && update->new_snapshot) {
-		/* Initial cache entry; count a channel creation */
+		/* Initial channel snapshot; count a channel creation */
 		ast_statsd_log_string("channels.count", AST_STATSD_GAUGE, "+1", 1.0);
-	} else if (update->old_snapshot && !update->new_snapshot) {
-		/* Cache entry removed. Compute the age of the channel and post
+	} else if (update->old_snapshot && ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
+		/* Channel is gone. Compute the age of the channel and post
 		 * that, as well as decrementing the channel count.
 		 */
-		struct ast_channel_snapshot *last;
 		int64_t age;
 
-		last = stasis_message_data(update->old_snapshot);
 		age = ast_tvdiff_ms(*stasis_message_timestamp(message),
-			last->creationtime);
+			update->new_snapshot->creationtime);
 		ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
 
 		/* And decrement the channel count */
@@ -161,11 +152,11 @@
 {
 	/* You can create a message router to route messages by type */
 	router = stasis_message_router_create(
-		ast_channel_topic_all_cached());
+		ast_channel_topic_all());
 	if (!router) {
 		return AST_MODULE_LOAD_DECLINE;
 	}
-	stasis_message_router_add(router, stasis_cache_update_type(),
+	stasis_message_router_add(router, ast_channel_snapshot_type(),
 		updates, NULL);
 	stasis_message_router_set_default(router, default_route, NULL);
 
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 18ac7d6..b4f3bc6 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -144,17 +144,11 @@
 	}
 
 	forwards->forward_type = FORWARD_CHANNEL;
-	if (chan) {
-		forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
-			app->topic);
-	}
-	forwards->topic_cached_forward = stasis_forward_all(
-		chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
+	forwards->topic_forward = stasis_forward_all(
+		chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
 		app->topic);
 
-	if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
-		/* Half-subscribed is a bad thing */
-		forwards_unsubscribe(forwards);
+	if (!forwards->topic_forward) {
 		ao2_ref(forwards, -1);
 		return NULL;
 	}
@@ -420,7 +414,7 @@
 
 	if (!old_snapshot) {
 		return channel_created_event(snapshot, tv);
-	} else if (!new_snapshot) {
+	} else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
 		return channel_destroyed_event(snapshot, tv);
 	} else if (old_snapshot->state != new_snapshot->state) {
 		return channel_state_change_event(snapshot, tv);
@@ -436,8 +430,8 @@
 {
 	struct ast_json *json_channel;
 
-	/* No Newexten event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
+	/* No Newexten event on first channel snapshot */
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -470,8 +464,8 @@
 {
 	struct ast_json *json_channel;
 
-	/* No NewCallerid event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
+	/* No NewCallerid event on first channel snapshot */
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -500,8 +494,8 @@
 {
 	struct ast_json *json_channel;
 
-	/* No ChannelConnectedLine event on cache clear or first event */
-	if (!old_snapshot || !new_snapshot) {
+	/* No ChannelConnectedLine event on first channel snapshot */
+	if (!old_snapshot) {
 		return NULL;
 	}
 
@@ -532,39 +526,22 @@
 	struct stasis_message *message)
 {
 	struct stasis_app *app = data;
-	struct stasis_cache_update *update;
-	struct ast_channel_snapshot *new_snapshot;
-	struct ast_channel_snapshot *old_snapshot;
-	const struct timeval *tv;
+	struct ast_channel_snapshot_update *update = stasis_message_data(message);
 	int i;
 
-	ast_assert(stasis_message_type(message) == stasis_cache_update_type());
-
-	update = stasis_message_data(message);
-
-	ast_assert(update->type == ast_channel_snapshot_type());
-
-	new_snapshot = stasis_message_data(update->new_snapshot);
-	old_snapshot = stasis_message_data(update->old_snapshot);
-
-	/* Pull timestamp from the new snapshot, or from the update message
-	 * when there isn't one. */
-	tv = update->new_snapshot ?
-		stasis_message_timestamp(update->new_snapshot) :
-		stasis_message_timestamp(message);
-
 	for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
 		struct ast_json *msg;
 
-		msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
+		msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
+			stasis_message_timestamp(message));
 		if (msg) {
 			app_send(app, msg);
 			ast_json_unref(msg);
 		}
 	}
 
-	if (!new_snapshot && old_snapshot) {
-		unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
+	if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
+		unsubscribe(app, "channel", update->new_snapshot->uniqueid, 1);
 	}
 }
 
@@ -987,7 +964,7 @@
 	res |= stasis_message_router_add_cache_update(app->router,
 		ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
 
-	res |= stasis_message_router_add_cache_update(app->router,
+	res |= stasis_message_router_add(app->router,
 		ast_channel_snapshot_type(), sub_channel_update_handler, app);
 
 	res |= stasis_message_router_add_cache_update(app->router,
diff --git a/res/stasis/control.c b/res/stasis/control.c
index e4d007c..5b3b048 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -773,22 +773,7 @@
 struct ast_channel_snapshot *stasis_app_control_get_snapshot(
 	const struct stasis_app_control *control)
 {
-	struct stasis_message *msg;
-	struct ast_channel_snapshot *snapshot;
-
-	msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
-		stasis_app_control_get_channel_id(control));
-	if (!msg) {
-		return NULL;
-	}
-
-	snapshot = stasis_message_data(msg);
-	ast_assert(snapshot != NULL);
-
-	ao2_ref(snapshot, +1);
-	ao2_ref(msg, -1);
-
-	return snapshot;
+	return ast_channel_snapshot_get_latest(stasis_app_control_get_channel_id(control));
 }
 
 static int app_send_command_on_condition(struct stasis_app_control *control,
diff --git a/tests/test_cel.c b/tests/test_cel.c
index 0b17d48..c1e7340 100644
--- a/tests/test_cel.c
+++ b/tests/test_cel.c
@@ -276,8 +276,7 @@
 	ast_hangup((channel)); \
 	HANGUP_EVENT(channel, cause, dialstatus); \
 	APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \
-	ao2_cleanup(stasis_cache_get(ast_channel_cache(), \
-		ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \
+	ao2_cleanup(ast_channel_snapshot_get_latest(ast_channel_uniqueid(channel))); \
 	ao2_cleanup(channel); \
 	channel = NULL; \
 	} while (0)
diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c
index 9a49dd8..8408f36 100644
--- a/tests/test_stasis_endpoints.c
+++ b/tests/test_stasis_endpoints.c
@@ -255,32 +255,18 @@
 	ast_hangup(chan);
 	chan = NULL;
 
-	actual_count = stasis_message_sink_wait_for_count(sink, 6,
+	actual_count = stasis_message_sink_wait_for_count(sink, 3,
 		STASIS_SINK_DEFAULT_WAIT);
-	ast_test_validate(test, 6 == actual_count);
+	ast_test_validate(test, 3 == actual_count);
 
 	msg = sink->messages[1];
 	type = stasis_message_type(msg);
-	ast_test_validate(test, stasis_cache_update_type() == type);
+	ast_test_validate(test, ast_channel_snapshot_type() == type);
 
 	msg = sink->messages[2];
 	type = stasis_message_type(msg);
-	ast_test_validate(test, ast_channel_snapshot_type() == type);
-
-	msg = sink->messages[3];
-	type = stasis_message_type(msg);
-	ast_test_validate(test, stasis_cache_update_type() == type);
-
-	/* The ordering of the cache clear and endpoint snapshot are
-	 * unspecified */
-	msg = sink->messages[4];
-	if (stasis_message_type(msg) == stasis_cache_clear_type()) {
-		/* Okay; the next message should be the endpoint snapshot */
-		msg = sink->messages[5];
-	}
-
-	type = stasis_message_type(msg);
 	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+
 	actual_snapshot = stasis_message_data(msg);
 	ast_test_validate(test, 0 == actual_snapshot->num_channels);
 

-- 
To view, visit https://gerrit.asterisk.org/10478
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I9334febff60a82d7c39703e49059fa3a68825786
Gerrit-Change-Number: 10478
Gerrit-PatchSet: 12
Gerrit-Owner: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Benjamin Keith Ford <bford at digium.com>
Gerrit-Reviewer: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Jenkins2 (1000185)
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Kevin Harwell <kharwell at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181126/f7ca1945/attachment-0001.html>


More information about the asterisk-code-review mailing list