[svn-commits] rmudgett: branch rmudgett/stasis_cache r408837 - in /team/rmudgett/stasis_cac...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Fri Feb 21 22:18:58 CST 2014


Author: rmudgett
Date: Fri Feb 21 22:18:53 2014
New Revision: 408837

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408837
Log:
stasis: Add eid support to stasis messaging and cache.

* Convert all stasis messages to have an eid to identify its source.

* Make stasis cache store items with the same id but from different source
eids in the same cache entry.

Note: The device state and mwi state events are published to stasis with
the eid source and consumers need to be updated to get an aggregated state
for a specific device and mwi.  The device_state_aggregation_test unit
test currently fails as a result.

Modified:
    team/rmudgett/stasis_cache/include/asterisk/stasis.h
    team/rmudgett/stasis_cache/main/app.c
    team/rmudgett/stasis_cache/main/devicestate.c
    team/rmudgett/stasis_cache/main/stasis_cache.c
    team/rmudgett/stasis_cache/main/stasis_message.c
    team/rmudgett/stasis_cache/tests/test_devicestate.c

Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Fri Feb 21 22:18:53 2014
@@ -290,11 +290,39 @@
  *
  * \param type Type of the message
  * \param data Immutable data that is the actual contents of the message
+ *
  * \return New message
  * \return \c NULL on error
+ *
  * \since 12
  */
 struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Create a new message for an entity.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \param eid What entity originated this message.
+ *
+ * \return New message
+ * \return \c NULL on error
+ */
+struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
+
+/*!
+ * \brief Get the entity id for a \ref stasis_message.
+ *
+ * \param msg Message to get eid.
+ *
+ * \return Entity id of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ */
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
 
 /*!
  * \brief Get the message type for a \ref stasis_message.
@@ -749,31 +777,72 @@
 struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
 
 /*!
- * \brief Retrieve an item from the cache.
+ * \brief Retrieve an item from the cache for the ast_eid_default entity.
  *
  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
  *
  * \param cache The cache to query.
  * \param type Type of message to retrieve.
  * \param id Identity of the snapshot to retrieve.
- * \return Message from the cache.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get(
-	struct stasis_cache *cache, struct stasis_message_type *type,
-	const char *id);
-
-/*!
- * \brief Dump cached items to a subscription
+ *
+ * \retval Message from the cache.
+ * \retval \c NULL if message is not found.
+ *
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Retrieve an item from the cache for a specific entity.
+ *
+ * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ * \param eid Specific entity id to retrieve.  NULL if any entity, but prefer internal.
+ *
+ * \retval Message from the cache.
+ * \retval \c NULL if message is not found.
+ */
+struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
+
+/*!
+ * \brief Retrieve all matching items from the cache.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ *
+ * \retval Container of matching items found.
+ * \retval \c NULL if error.
+ */
+struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Dump cached items to a subscription for the ast_eid_default entity.
+ *
  * \param cache The cache to query.
  * \param type Type of message to dump (any type if \c NULL).
- * \return ao2_container containing all matches (must be unreffed by caller)
- * \return \c NULL on allocation error
- * \since 12
- */
-struct ao2_container *stasis_cache_dump(struct stasis_cache *cache,
-	struct stasis_message_type *type);
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ *
+ * \since 12
+ */
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
+
+/*!
+ * \brief Dump cached items to a subscription for a specific entity.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to dump (any type if \c NULL).
+ * \param eid Specific entity id to retrieve.  NULL if any entities.
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ */
+struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
 
 /*! @} */
 

Modified: team/rmudgett/stasis_cache/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/app.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/app.c (original)
+++ team/rmudgett/stasis_cache/main/app.c Fri Feb 21 22:18:53 2014
@@ -2821,7 +2821,7 @@
 	return mwi_state;
 }
 
-
+/* BUGBUG mwi_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
 int ast_publish_mwi_state_full(
 			const char *mailbox,
 			const char *context,
@@ -2857,10 +2857,13 @@
 	if (eid) {
 		mwi_state->eid = *eid;
 	} else {
-		ast_set_default_eid(&mwi_state->eid);
-	}
-
-	message = stasis_message_create(ast_mwi_state_type(), mwi_state);
+		mwi_state->eid = ast_eid_default;
+	}
+
+	message = stasis_message_create_full(ast_mwi_state_type(), mwi_state, &mwi_state->eid);
+	if (!message) {
+		return -1;
+	}
 
 	mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
 	if (!mailbox_specific_topic) {
@@ -2911,7 +2914,7 @@
 	ao2_ref(obj->mwi_state, +1);
 	obj->blob = ast_json_ref(blob);
 
-	msg = stasis_message_create(message_type, obj);
+	msg = stasis_message_create_full(message_type, obj, &mwi_state->eid);
 	if (!msg) {
 		return NULL;
 	}

Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Fri Feb 21 22:18:53 2014
@@ -750,6 +750,7 @@
 	return 0;
 }
 
+/* BUGBUG device_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
 int ast_publish_device_state_full(
 			const char *device,
 			enum ast_device_state state,
@@ -767,7 +768,11 @@
 		return -1;
 	}
 
-	message = stasis_message_create(ast_device_state_message_type(), device_state);
+	message = stasis_message_create_full(ast_device_state_message_type(), device_state,
+		eid ?: &ast_eid_default);
+	if (!message) {
+		return -1;
+	}
 
 	device_specific_topic = ast_device_state_topic(device);
 	if (!device_specific_topic) {

Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Fri Feb 21 22:18:53 2014
@@ -36,6 +36,7 @@
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
 #include "asterisk/utils.h"
+#include "asterisk/vector.h"
 
 #ifdef LOW_MEMORY
 #define NUM_CACHE_BUCKETS 17
@@ -131,27 +132,42 @@
 
 struct cache_entry {
 	struct cache_entry_key key;
-	struct stasis_message *snapshot;
+	/*! Internal snapshot of the stasis event. */
+	struct stasis_message *internal;
+	/*! External snapshots of the stasis event. */
+	AST_VECTOR(, struct stasis_message *) external;
 };
 
 static void cache_entry_dtor(void *obj)
 {
 	struct cache_entry *entry = obj;
+	size_t idx;
 
 	ao2_cleanup(entry->key.type);
 	entry->key.type = NULL;
 	ast_free((char *) entry->key.id);
 	entry->key.id = NULL;
-	ao2_cleanup(entry->snapshot);
-	entry->snapshot = NULL;
+
+	ao2_cleanup(entry->internal);
+	entry->internal = NULL;
+
+	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+		struct stasis_message *external;
+
+		external = AST_VECTOR_GET(&entry->external, idx);
+		ao2_cleanup(external);
+	}
+	AST_VECTOR_FREE(&entry->external);
 }
 
 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
 {
 	struct cache_entry *entry;
+	int is_external;
 
 	ast_assert(type != NULL);
 	ast_assert(id != NULL);
+	ast_assert(snapshot != NULL);
 
 	entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
 		AO2_ALLOC_OPT_LOCK_NOLOCK);
@@ -166,7 +182,21 @@
 	}
 	entry->key.type = ao2_bump(type);
 
-	entry->snapshot = ao2_bump(snapshot);
+	is_external = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
+	if (AST_VECTOR_INIT(&entry->external, is_external)) {
+		ao2_cleanup(entry);
+		return NULL;
+	}
+
+	if (is_external) {
+		if (AST_VECTOR_APPEND(&entry->external, snapshot)) {
+			ao2_cleanup(entry);
+			return NULL;
+		}
+	} else {
+		entry->internal = snapshot;
+	}
+	ao2_bump(snapshot);
 
 	return entry;
 }
@@ -264,8 +294,91 @@
 	return cache;
 }
 
+/*!
+ * \internal
+ * \brief Remove the stasis snapshot in the cache entry determined by eid.
+ *
+ * \param entries Container of cached entries.
+ * \param cached_entry The entry to remove the snapshot from.
+ * \param eid Which snapshot in the cached entry.
+ *
+ * \note The entries container is already locked.
+ *
+ * \return Previous stasis entry snapshot.
+ */
+static struct stasis_message *cache_remove(struct ao2_container *entries, struct cache_entry *cached_entry, const struct ast_eid *eid)
+{
+	struct stasis_message *old_snapshot;
+	int is_external;
+
+	is_external = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_external) {
+		old_snapshot = cached_entry->internal;
+		cached_entry->internal = NULL;
+	} else {
+		int idx;
+
+		old_snapshot = NULL;
+		for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+			struct stasis_message *cur;
+
+			cur = AST_VECTOR_GET(&cached_entry->external, idx);
+			if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+				old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+				break;
+			}
+		}
+	}
+
+	if (!cached_entry->internal && !AST_VECTOR_SIZE(&cached_entry->external)) {
+		ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
+	}
+
+	return old_snapshot;
+}
+
+/*!
+ * \internal
+ * \brief Update the stasis snapshot in the cache entry determined by eid.
+ *
+ * \param cached_entry The entry to remove the snapshot from.
+ * \param eid Which snapshot in the cached entry.
+ * \param new_snapshot Snapshot to replace the old snapshot.
+ *
+ * \return Previous stasis entry snapshot.
+ */
+static struct stasis_message *cache_udpate(struct cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
+{
+	struct stasis_message *old_snapshot;
+	int is_external;
+	int idx;
+
+	is_external = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_external) {
+		old_snapshot = cached_entry->internal;
+		cached_entry->internal = ao2_bump(new_snapshot);
+		return old_snapshot;
+	}
+
+	old_snapshot = NULL;
+	for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+		struct stasis_message *cur;
+
+		cur = AST_VECTOR_GET(&cached_entry->external, idx);
+		if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+			old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+			break;
+		}
+	}
+	if (!AST_VECTOR_APPEND(&cached_entry->external, new_snapshot)) {
+		ao2_bump(new_snapshot);
+	}
+
+	return old_snapshot;
+}
+
 static struct stasis_message *cache_put(struct stasis_cache *cache,
-	struct stasis_message_type *type, const char *id,
+	struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
 	struct stasis_message *new_snapshot)
 {
 	struct cache_entry_key search_key;
@@ -273,6 +386,7 @@
 	struct stasis_message *old_snapshot = NULL;
 
 	ast_assert(cache->entries != NULL);
+	ast_assert(eid != NULL);
 	ast_assert(new_snapshot == NULL ||
 		type == stasis_message_type(new_snapshot));
 
@@ -280,27 +394,21 @@
 
 	search_key.type = type;
 	search_key.id = id;
-
-	if (new_snapshot == NULL) {
-		/* Remove entry from cache */
-		cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NOLOCK);
+	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+	if (!new_snapshot) {
+		/* Remove snapshot from cache */
 		if (cached_entry) {
-			old_snapshot = cached_entry->snapshot;
-			cached_entry->snapshot = NULL;
-		}
+			old_snapshot = cache_remove(cache->entries, cached_entry, eid);
+		}
+	} else if (cached_entry) {
+		/* Update snapshot in cache */
+		old_snapshot = cache_udpate(cached_entry, eid, new_snapshot);
 	} else {
-		/* Insert/update cache */
-		cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+		/* Insert into the cache */
+		cached_entry = cache_entry_create(type, id, new_snapshot);
 		if (cached_entry) {
-			/* Update cache. Because objects are moving, no need to update refcounts. */
-			old_snapshot = cached_entry->snapshot;
-			cached_entry->snapshot = ao2_bump(new_snapshot);
-		} else {
-			/* Insert into the cache */
-			cached_entry = cache_entry_create(type, id, new_snapshot);
-			if (cached_entry) {
-				ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
-			}
+			ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
 		}
 	}
 
@@ -310,25 +418,136 @@
 	return old_snapshot;
 }
 
-struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
+/*!
+ * \internal
+ * \brief Dump all snapshots in the cache entry into the given container.
+ *
+ * \param snapshots Container to put all snapshots in the cache entry.
+ * \param entry Cache entry to use.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int cache_entry_dump(struct ao2_container *snapshots, const struct cache_entry *entry)
+{
+	int idx;
+	int err = 0;
+
+	ast_assert(snapshots != NULL);
+	ast_assert(entry != NULL);
+
+	if (entry->internal) {
+		err |= !ao2_link(snapshots, entry->internal);
+	}
+
+	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+		struct stasis_message *snapshot;
+
+		snapshot = AST_VECTOR_GET(&entry->external, idx);
+		err |= !ao2_link(snapshots, snapshot);
+	}
+
+	return err;
+}
+
+struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 {
 	struct cache_entry_key search_key;
 	struct cache_entry *cached_entry;
-	struct stasis_message *snapshot = NULL;
+	struct ao2_container *found;
 
 	ast_assert(cache != NULL);
 	ast_assert(cache->entries != NULL);
 	ast_assert(type != NULL);
 	ast_assert(id != NULL);
 
+	found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+	if (!found) {
+		return NULL;
+	}
+
 	ao2_rdlock(cache->entries);
 
 	search_key.type = type;
 	search_key.id = id;
-
+	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	if (cached_entry && cache_entry_dump(found, cached_entry)) {
+		ao2_cleanup(found);
+		found = NULL;
+	}
+
+	ao2_unlock(cache->entries);
+
+	ao2_cleanup(cached_entry);
+	return found;
+}
+
+/*!
+ * \internal
+ * \brief Retrieve an item from the cache entry for a specific eid.
+ *
+ * \param entry Cache entry to use.
+ * \param eid Specific entity id to retrieve.  NULL if any entity, but prefer internal.
+ *
+ * \note The returned snapshot has not had its reference bumped.
+ *
+ * \retval Snapshot from the cache.
+ * \retval \c NULL if snapshot is not found.
+ */
+static struct stasis_message *cache_entry_by_eid(const struct cache_entry *entry, const struct ast_eid *eid)
+{
+	struct stasis_message *snapshot = NULL;
+
+	if (eid) {
+		int is_external;
+
+		/* Get snapshot with specific eid. */
+		is_external = ast_eid_cmp(eid, &ast_eid_default);
+		if (!is_external) {
+			snapshot = entry->internal;
+		} else {
+			int idx;
+
+			for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+				struct stasis_message *cur;
+
+				cur = AST_VECTOR_GET(&entry->external, idx);
+				if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+					snapshot = cur;
+					break;
+				}
+			}
+		}
+	} else {
+		/* Get any snapshot, but prefer internal. */
+		if (entry->internal) {
+			snapshot = entry->internal;
+		} else if (AST_VECTOR_SIZE(&entry->external)) {
+			snapshot = AST_VECTOR_GET(&entry->external, 0);
+		}
+	}
+
+	return snapshot;
+}
+
+struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
+{
+	struct cache_entry_key search_key;
+	struct cache_entry *cached_entry;
+	struct stasis_message *snapshot = NULL;
+
+	ast_assert(cache != NULL);
+	ast_assert(cache->entries != NULL);
+	ast_assert(type != NULL);
+	ast_assert(id != NULL);
+
+	ao2_rdlock(cache->entries);
+
+	search_key.type = type;
+	search_key.id = id;
 	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 	if (cached_entry) {
-		snapshot = cached_entry->snapshot;
+		snapshot = cache_entry_by_eid(cached_entry, eid);
 		ao2_bump(snapshot);
 	}
 
@@ -336,11 +555,17 @@
 
 	ao2_cleanup(cached_entry);
 	return snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
+{
+	return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
 }
 
 struct cache_dump_data {
 	struct ao2_container *cached;
 	struct stasis_message_type *type;
+	const struct ast_eid *eid;
 };
 
 static int cache_dump_cb(void *obj, void *arg, int flags)
@@ -349,27 +574,42 @@
 	struct cache_entry *entry = obj;
 
 	if (!cache_dump->type || entry->key.type == cache_dump->type) {
-		ao2_link(cache_dump->cached, entry->snapshot);
+		if (cache_dump->eid) {
+			struct stasis_message *snapshot;
+
+			snapshot = cache_entry_by_eid(entry, cache_dump->eid);
+			if (snapshot) {
+				ao2_link(cache_dump->cached, snapshot);
+			}
+		} else {
+			cache_entry_dump(cache_dump->cached, entry);
+		}
 	}
 
 	return 0;
 }
 
-struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
+struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 {
 	struct cache_dump_data cache_dump;
 
+	ast_assert(cache != NULL);
 	ast_assert(cache->entries != NULL);
 
+	cache_dump.eid = eid;
 	cache_dump.type = type;
-	cache_dump.cached = ao2_container_alloc_options(
-		AO2_ALLOC_OPT_LOCK_NOLOCK, 1, NULL, NULL);
+	cache_dump.cached = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
 	if (!cache_dump.cached) {
 		return NULL;
 	}
 
 	ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
 	return cache_dump.cached;
+}
+
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
+{
+	return stasis_cache_dump_full(cache, type, &ast_eid_default);
 }
 
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
@@ -453,7 +693,8 @@
 		if (clear_id) {
 			struct stasis_message *old_snapshot;
 
-			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
+			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id,
+				stasis_message_eid(clear_msg), NULL);
 			if (old_snapshot) {
 				struct stasis_message *update;
 
@@ -479,7 +720,8 @@
 		struct stasis_message *old_snapshot;
 		struct stasis_message *update;
 
-		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
+		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id,
+			stasis_message_eid(message), message);
 
 		update = update_create(old_snapshot, message);
 		if (update) {

Modified: team/rmudgett/stasis_cache/main/stasis_message.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_message.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_message.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_message.c Fri Feb 21 22:18:53 2014
@@ -53,7 +53,7 @@
 struct stasis_message_type *stasis_message_type_create(const char *name,
 	struct stasis_message_vtable *vtable)
 {
-	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+	struct stasis_message_type *type;
 
 	type = ao2_alloc(sizeof(*type), message_type_dtor);
 	if (!type) {
@@ -66,11 +66,11 @@
 
 	type->name = ast_strdup(name);
 	if (!type->name) {
+		ao2_cleanup(type);
 		return NULL;
 	}
 	type->vtable = vtable;
 
-	ao2_ref(type, +1);
 	return type;
 }
 
@@ -87,6 +87,8 @@
 	struct stasis_message_type *type;
 	/*! Message content */
 	void *data;
+	/*! Where this message originated. */
+	struct ast_eid eid;
 };
 
 static void stasis_message_dtor(void *obj)
@@ -96,11 +98,11 @@
 	ao2_cleanup(message->data);
 }
 
-struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
 {
-	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+	struct stasis_message *message;
 
-	if (type == NULL || data == NULL) {
+	if (type == NULL || data == NULL || eid == NULL) {
 		return NULL;
 	}
 
@@ -114,9 +116,22 @@
 	message->type = type;
 	ao2_ref(data, +1);
 	message->data = data;
+	message->eid = *eid;
 
-	ao2_ref(message, +1);
 	return message;
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+	return stasis_message_create_full(type, data, &ast_eid_default);
+}
+
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg)
+{
+	if (msg == NULL) {
+		return NULL;
+	}
+	return &msg->eid;
 }
 
 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)

Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Fri Feb 21 22:18:53 2014
@@ -401,6 +401,7 @@
 	ao2_callback(cache_dump, 0, remove_device_states_cb, NULL);
 }
 
+/* BUGBUG this test currently fails because the device state is storing the information in a different place now. */
 AST_TEST_DEFINE(device_state_aggregation_test)
 {
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);




More information about the svn-commits mailing list