[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r409698 - in /team/rmudgett/stasis_cac...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Mar 4 20:00:03 CST 2014


Author: rmudgett
Date: Tue Mar  4 19:59:50 2014
New Revision: 409698

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=409698
Log:
Address review feedback.

* Renamed internal/external cache entries to local/remote respectively.
stasis_cache_entry_get_internal() -> stasis_cache_entry_get_local()
stasis_cache_entry_get_external() -> stasis_cache_entry_get_remote()

* Renamed typedef cache_aggregate_post_fn to cache_aggregate_publish_fn
and added a topic parameter.

* Added note defining what is an aggregate message for the stasis cache
with a couple examples.

* Split stasis_cache_dump_full() into stasis_cache_dump_by_eid() and
stasis_cache_dump_all() to be inline with the stasis_cache_get functions.

* Straightened out devicestate.c subscription for non-cached events.

* Updated the message stasis unit test to test
stasis_message_create_full() and stasis_message_eid().

* Added cache_eid_aggregate stasis unit test to test the external entity
and aggregate caching support.

Modified:
    team/rmudgett/stasis_cache/include/asterisk/stasis.h
    team/rmudgett/stasis_cache/main/devicestate.c
    team/rmudgett/stasis_cache/main/stasis_cache.c
    team/rmudgett/stasis_cache/tests/test_devicestate.c
    team/rmudgett/stasis_cache/tests/test_stasis.c

Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Tue Mar  4 19:59:50 2014
@@ -309,8 +309,15 @@
  * \param data Immutable data that is the actual contents of the message
  * \param eid What entity originated this message. (NULL for aggregate)
  *
- * \return New message
- * \return \c NULL on error
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
+ * \retval New message
+ * \retval \c NULL on error
  *
  * \since 12.2.0
  */
@@ -704,20 +711,43 @@
  * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
  * if a new aggregate could not be calculated because of error.
  *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
  * \return New aggregate-snapshot calculated on success.
  * Caller has a reference on return.
  */
 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
 
 /*!
- * \brief Callback to post the aggregate cache entry message.
+ * \brief Callback to publish the aggregate cache entry message.
  * \since 12.2.0
  *
- * \param aggregate The aggregate shapshot message to post.
+ * \details
+ * Once an aggregate message is calculated.  This callback publishes the
+ * message so subscribers will know the new value of an aggregated state.
+ *
+ * \param topic The aggregate message may be published to this topic.
+ *        It is the topic to which the cache itself is subscribed.
+ * \param aggregate The aggregate shapshot message to publish.
+ *
+ * \note It is up to the function to determine if there is a better topic
+ * the aggregate message should be published over.
+ *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
  *
  * \return Nothing
  */
-typedef void (*cache_aggregate_post_fn)(struct stasis_message *aggregate);
+typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
 
 /*!
  * \brief Get the aggregate cache entry snapshot.
@@ -727,37 +757,44 @@
  *
  * \note A reference is not given to the returned pointer so don't unref it.
  *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
  * \retval Aggregate-snapshot in cache.
  * \retval NULL if not present.
  */
 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
 
 /*!
- * \brief Get the internal cache entry snapshot.
+ * \brief Get the local entity's cache entry snapshot.
  * \since 12.2.0
  *
- * \param entry Cache entry to get the internal snapshot.
+ * \param entry Cache entry to get the local entity's snapshot.
  *
  * \note A reference is not given to the returned pointer so don't unref it.
  *
  * \retval Internal-snapshot in cache.
  * \retval NULL if not present.
  */
-struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry);
-
-/*!
- * \brief Get the external cache entry snapshot by index.
+struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get a remote entity's cache entry snapshot by index.
  * \since 12.2.0
  *
- * \param entry Cache entry to get the external snapshot.
- * \param idx Which external snapshot to get.
+ * \param entry Cache entry to get a remote entity's snapshot.
+ * \param idx Which remote entity's snapshot to get.
  *
  * \note A reference is not given to the returned pointer so don't unref it.
  *
- * \retval External-snapshot in cache.
+ * \retval Remote-entity-snapshot in cache.
  * \retval NULL if not present.
  */
-struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx);
+struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
 
 /*!
  * \brief Create a cache.
@@ -786,14 +823,21 @@
  *
  * \param id_fn Callback to extract the id from a snapshot message.
  * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
- * \param aggregate_post_fn Callback to post the aggregate cache entry.
+ * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
+ *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
  *
  * \retval New cache indexed by \a id_fn.
  * \retval \c NULL on error
  *
  * \since 12.2.0
  */
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn);
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
 
 /*!
  * \brief Create a topic which monitors and caches messages from another topic.
@@ -894,6 +938,13 @@
  * \param id Identity of the snapshot to retrieve.
  * \param eid Specific entity id to retrieve.  NULL for aggregate.
  *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data.  e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device.  e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
  * \retval Message from the cache.
  * \retval \c NULL if message is not found.
  *
@@ -902,7 +953,7 @@
 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
 
 /*!
- * \brief Retrieve all matching items from the cache.
+ * \brief Retrieve all matching entity items from the cache.
  * \since 12.2.0
  *
  * \param cache The cache to query.
@@ -933,12 +984,24 @@
  *
  * \param cache The cache to query.
  * \param type Type of message to dump (any type if \c NULL).
- * \param eid Specific entity id to retrieve.  NULL if any entities.
+ * \param eid Specific entity id to retrieve.  NULL for aggregate.
  *
  * \retval ao2_container containing all matches (must be unreffed by caller)
  * \retval \c NULL on allocation error
  */
-struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
+struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
+
+/*!
+ * \brief Dump all entity items from the cache to a subscription.
+ * \since 12.2.0
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to dump (any type if \c NULL).
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ */
+struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
 
 /*! @} */
 

Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Tue Mar  4 19:59:50 2014
@@ -570,15 +570,6 @@
 {
 	struct ast_device_state_message *device_state;
 
-	if (stasis_cache_update_type() == stasis_message_type(msg)) {
-		struct stasis_cache_update *update = stasis_message_data(msg);
-
-		if (!update->new_snapshot) {
-			return;
-		}
-		msg = update->new_snapshot;
-	}
-
 	if (ast_device_state_message_type() != stasis_message_type(msg)) {
 		return;
 	}
@@ -738,14 +729,15 @@
 
 /*!
  * \internal
- * \brief Callback to post the aggregate device state cache entry message.
+ * \brief Callback to publish the aggregate device state cache entry message.
  * \since 12.2.0
  *
- * \param aggregate The aggregate shapshot message to post.
+ * \param cache_topic Caching topic the aggregate message may be published over.
+ * \param aggregate The aggregate shapshot message to publish.
  *
  * \return Nothing
  */
-static void device_state_aggregate_post(struct stasis_message *aggregate)
+static void device_state_aggregate_publish(struct stasis_topic *cache_topic, struct stasis_message *aggregate)
 {
 	const char *device;
 	struct stasis_topic *device_specific_topic;
@@ -787,14 +779,14 @@
 
 	/* Determine the new aggregate device state. */
 	ast_devstate_aggregate_init(&aggregate);
-	snapshot = stasis_cache_entry_get_internal(entry);
+	snapshot = stasis_cache_entry_get_local(entry);
 	if (snapshot) {
 		device_state = stasis_message_data(snapshot);
 		device = device_state->device;
 		ast_devstate_aggregate_add(&aggregate, device_state->state);
 	}
 	for (idx = 0; ; ++idx) {
-		snapshot = stasis_cache_entry_get_external(entry, idx);
+		snapshot = stasis_cache_entry_get_remote(entry, idx);
 		if (!snapshot) {
 			break;
 		}
@@ -870,22 +862,22 @@
 		return -1;
 	}
 	device_state_cache = stasis_cache_create_full(device_state_get_id,
-		device_state_aggregate_calc, device_state_aggregate_post);
+		device_state_aggregate_calc, device_state_aggregate_publish);
 	if (!device_state_cache) {
 		devstate_cleanup();
 		return -1;
 	}
-	device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all,
+	device_state_topic_cached = stasis_caching_topic_create(ast_device_state_topic_all(),
 		device_state_cache);
 	if (!device_state_topic_cached) {
 		devstate_cleanup();
 		return -1;
 	}
 
-	devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(),
+	devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
 		devstate_change_cb, NULL);
 	if (!devstate_message_sub) {
-		ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+		ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
 		devstate_cleanup();
 		return -1;
 	}

Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Tue Mar  4 19:59:50 2014
@@ -49,7 +49,7 @@
 	struct ao2_container *entries;
 	snapshot_get_id id_fn;
 	cache_aggregate_calc_fn aggregate_calc_fn;
-	cache_aggregate_post_fn aggregate_post_fn;
+	cache_aggregate_publish_fn aggregate_publish_fn;
 };
 
 /*! \internal */
@@ -136,10 +136,10 @@
 	struct cache_entry_key key;
 	/*! Aggregate snapshot of the stasis cache. */
 	struct stasis_message *aggregate;
-	/*! Internal snapshot of the stasis event. */
-	struct stasis_message *internal;
-	/*! External snapshots of the stasis event. */
-	AST_VECTOR(, struct stasis_message *) external;
+	/*! Local entity snapshot of the stasis event. */
+	struct stasis_message *local;
+	/*! Remote entity snapshots of the stasis event. */
+	AST_VECTOR(, struct stasis_message *) remote;
 };
 
 static void cache_entry_dtor(void *obj)
@@ -154,22 +154,22 @@
 
 	ao2_cleanup(entry->aggregate);
 	entry->aggregate = NULL;
-	ao2_cleanup(entry->internal);
-	entry->internal = NULL;
-
-	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
-		struct stasis_message *external;
-
-		external = AST_VECTOR_GET(&entry->external, idx);
-		ao2_cleanup(external);
-	}
-	AST_VECTOR_FREE(&entry->external);
+	ao2_cleanup(entry->local);
+	entry->local = NULL;
+
+	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
+		struct stasis_message *remote;
+
+		remote = AST_VECTOR_GET(&entry->remote, idx);
+		ao2_cleanup(remote);
+	}
+	AST_VECTOR_FREE(&entry->remote);
 }
 
 static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
 {
 	struct stasis_cache_entry *entry;
-	int is_external;
+	int is_remote;
 
 	ast_assert(type != NULL);
 	ast_assert(id != NULL);
@@ -188,19 +188,19 @@
 	}
 	entry->key.type = ao2_bump(type);
 
-	is_external = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
-	if (AST_VECTOR_INIT(&entry->external, is_external)) {
+	is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
+	if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
 		ao2_cleanup(entry);
 		return NULL;
 	}
 
-	if (is_external) {
-		if (AST_VECTOR_APPEND(&entry->external, snapshot)) {
+	if (is_remote) {
+		if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
 			ao2_cleanup(entry);
 			return NULL;
 		}
 	} else {
-		entry->internal = snapshot;
+		entry->local = snapshot;
 	}
 	ao2_bump(snapshot);
 
@@ -279,7 +279,8 @@
 }
 
 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
-	cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn)
+	cache_aggregate_calc_fn aggregate_calc_fn,
+	cache_aggregate_publish_fn aggregate_publish_fn)
 {
 	struct stasis_cache *cache;
 
@@ -298,7 +299,7 @@
 
 	cache->id_fn = id_fn;
 	cache->aggregate_calc_fn = aggregate_calc_fn;
-	cache->aggregate_post_fn = aggregate_post_fn;
+	cache->aggregate_publish_fn = aggregate_publish_fn;
 
 	return cache;
 }
@@ -313,15 +314,15 @@
 	return entry->aggregate;
 }
 
-struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry)
-{
-	return entry->internal;
-}
-
-struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx)
-{
-	if (idx < AST_VECTOR_SIZE(&entry->external)) {
-		return AST_VECTOR_GET(&entry->external, idx);
+struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
+{
+	return entry->local;
+}
+
+struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
+{
+	if (idx < AST_VECTOR_SIZE(&entry->remote)) {
+		return AST_VECTOR_GET(&entry->remote, idx);
 	}
 	return NULL;
 }
@@ -363,28 +364,28 @@
 static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
 {
 	struct stasis_message *old_snapshot;
-	int is_external;
-
-	is_external = ast_eid_cmp(eid, &ast_eid_default);
-	if (!is_external) {
-		old_snapshot = cached_entry->internal;
-		cached_entry->internal = NULL;
+	int is_remote;
+
+	is_remote = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_remote) {
+		old_snapshot = cached_entry->local;
+		cached_entry->local = NULL;
 	} else {
 		int idx;
 
 		old_snapshot = NULL;
-		for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+		for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
 			struct stasis_message *cur;
 
-			cur = AST_VECTOR_GET(&cached_entry->external, idx);
+			cur = AST_VECTOR_GET(&cached_entry->remote, idx);
 			if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
-				old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+				old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
 				break;
 			}
 		}
 	}
 
-	if (!cached_entry->internal && !AST_VECTOR_SIZE(&cached_entry->external)) {
+	if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
 		ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
 	}
 
@@ -404,27 +405,27 @@
 static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 {
 	struct stasis_message *old_snapshot;
-	int is_external;
+	int is_remote;
 	int idx;
 
-	is_external = ast_eid_cmp(eid, &ast_eid_default);
-	if (!is_external) {
-		old_snapshot = cached_entry->internal;
-		cached_entry->internal = ao2_bump(new_snapshot);
+	is_remote = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_remote) {
+		old_snapshot = cached_entry->local;
+		cached_entry->local = ao2_bump(new_snapshot);
 		return old_snapshot;
 	}
 
 	old_snapshot = NULL;
-	for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+	for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
 		struct stasis_message *cur;
 
-		cur = AST_VECTOR_GET(&cached_entry->external, idx);
+		cur = AST_VECTOR_GET(&cached_entry->remote, idx);
 		if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
-			old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+			old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
 			break;
 		}
 	}
-	if (!AST_VECTOR_APPEND(&cached_entry->external, new_snapshot)) {
+	if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
 		ao2_bump(new_snapshot);
 	}
 
@@ -496,7 +497,7 @@
  * \param entry Cache entry to use.
  *
  * \retval 0 on success.
- * \retval -1 on error.
+ * \retval non-zero on error.
  */
 static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
 {
@@ -508,14 +509,14 @@
 
 	/* The aggregate snapshot is not a snapshot from an entity. */
 
-	if (entry->internal) {
-		err |= !ao2_link(snapshots, entry->internal);
-	}
-
-	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+	if (entry->local) {
+		err |= !ao2_link(snapshots, entry->local);
+	}
+
+	for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
 		struct stasis_message *snapshot;
 
-		snapshot = AST_VECTOR_GET(&entry->external, idx);
+		snapshot = AST_VECTOR_GET(&entry->remote, idx);
 		err |= !ao2_link(snapshots, snapshot);
 	}
 
@@ -565,7 +566,7 @@
  */
 static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
 {
-	int is_external;
+	int is_remote;
 	int idx;
 
 	if (!eid) {
@@ -574,15 +575,15 @@
 	}
 
 	/* Get snapshot with specific eid. */
-	is_external = ast_eid_cmp(eid, &ast_eid_default);
-	if (!is_external) {
-		return entry->internal;
-	}
-
-	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+	is_remote = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_remote) {
+		return entry->local;
+	}
+
+	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
 		struct stasis_message *cur;
 
-		cur = AST_VECTOR_GET(&entry->external, idx);
+		cur = AST_VECTOR_GET(&entry->remote, idx);
 		if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
 			return cur;
 		}
@@ -621,33 +622,33 @@
 }
 
 struct cache_dump_data {
-	struct ao2_container *cached;
+	struct ao2_container *container;
 	struct stasis_message_type *type;
 	const struct ast_eid *eid;
 };
 
-static int cache_dump_cb(void *obj, void *arg, int flags)
+static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
 {
 	struct cache_dump_data *cache_dump = arg;
 	struct stasis_cache_entry *entry = obj;
 
 	if (!cache_dump->type || entry->key.type == cache_dump->type) {
-		if (cache_dump->eid) {
-			struct stasis_message *snapshot;
-
-			snapshot = cache_entry_by_eid(entry, cache_dump->eid);
-			if (snapshot) {
-				ao2_link(cache_dump->cached, snapshot);
+		struct stasis_message *snapshot;
+
+		snapshot = cache_entry_by_eid(entry, cache_dump->eid);
+		if (snapshot) {
+			if (!ao2_link(cache_dump->container, snapshot)) {
+				ao2_cleanup(cache_dump->container);
+				cache_dump->container = NULL;
+				return CMP_STOP;
 			}
-		} else {
-			cache_entry_dump(cache_dump->cached, entry);
 		}
 	}
 
 	return 0;
 }
 
-struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
+struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 {
 	struct cache_dump_data cache_dump;
 
@@ -656,18 +657,52 @@
 
 	cache_dump.eid = eid;
 	cache_dump.type = type;
-	cache_dump.cached = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
-	if (!cache_dump.cached) {
-		return NULL;
-	}
-
-	ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
-	return cache_dump.cached;
+	cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+	if (!cache_dump.container) {
+		return NULL;
+	}
+
+	ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
+	return cache_dump.container;
 }
 
 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
 {
-	return stasis_cache_dump_full(cache, type, &ast_eid_default);
+	return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
+}
+
+static int cache_dump_all_cb(void *obj, void *arg, int flags)
+{
+	struct cache_dump_data *cache_dump = arg;
+	struct stasis_cache_entry *entry = obj;
+
+	if (!cache_dump->type || entry->key.type == cache_dump->type) {
+		if (cache_entry_dump(cache_dump->container, entry)) {
+			ao2_cleanup(cache_dump->container);
+			cache_dump->container = NULL;
+			return CMP_STOP;
+		}
+	}
+
+	return 0;
+}
+
+struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
+{
+	struct cache_dump_data cache_dump;
+
+	ast_assert(cache != NULL);
+	ast_assert(cache->entries != NULL);
+
+	cache_dump.eid = NULL;
+	cache_dump.type = type;
+	cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+	if (!cache_dump.container) {
+		return NULL;
+	}
+
+	ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
+	return cache_dump.container;
 }
 
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
@@ -781,8 +816,9 @@
 		}
 
 		if (snapshots.aggregate_old != snapshots.aggregate_new) {
-			if (snapshots.aggregate_new && caching_topic->cache->aggregate_post_fn) {
-				caching_topic->cache->aggregate_post_fn(snapshots.aggregate_new);
+			if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
+				caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
+					snapshots.aggregate_new);
 			}
 			update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
 			if (update) {

Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Tue Mar  4 19:59:50 2014
@@ -403,7 +403,7 @@
 	struct ao2_container *cache_dump;
 
 	/* remove all device states created during this test */
-	cache_dump = stasis_cache_dump_full(ast_device_state_cache(), NULL, NULL);
+	cache_dump = stasis_cache_dump_all(ast_device_state_cache(), NULL);
 	if (!cache_dump) {
 		return;
 	}

Modified: team/rmudgett/stasis_cache/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_stasis.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_stasis.c (original)
+++ team/rmudgett/stasis_cache/tests/test_stasis.c Tue Mar  4 19:59:50 2014
@@ -94,11 +94,13 @@
 AST_TEST_DEFINE(message)
 {
 	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
 	RAII_VAR(char *, data, NULL, ao2_cleanup);
 	char *expected = "SomeData";
 	struct timeval expected_timestamp;
 	struct timeval time_diff;
+	struct ast_eid foreign_eid;
 
 	switch (cmd) {
 	case TEST_INIT:
@@ -112,29 +114,42 @@
 	}
 
 
+	memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
+
 	type = stasis_message_type_create("SomeMessage", NULL);
 
-	ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
-	ast_test_validate(test, NULL == stasis_message_create(type, NULL));
+	ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
+	ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
 
 	data = ao2_alloc(strlen(expected) + 1, NULL);
-	strcpy(data, expected);
+	strcpy(data, expected);/* Safe */
 	expected_timestamp = ast_tvnow();
-	uut = stasis_message_create(type, data);
-
-	ast_test_validate(test, NULL != uut);
-	ast_test_validate(test, type == stasis_message_type(uut));
-	ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
-	ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
-
-	time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
+	uut1 = stasis_message_create_full(type, data, &foreign_eid);
+	uut2 = stasis_message_create_full(type, data, NULL);
+
+	ast_test_validate(test, NULL != uut1);
+	ast_test_validate(test, NULL != uut2);
+	ast_test_validate(test, type == stasis_message_type(uut1));
+	ast_test_validate(test, type == stasis_message_type(uut2));
+	ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
+	ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
+	ast_test_validate(test, NULL != stasis_message_eid(uut1));
+	ast_test_validate(test, NULL == stasis_message_eid(uut2));
+	ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
+
+	ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
+
+	time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
 	/* 10ms is certainly long enough for the two calls to complete */
 	ast_test_validate(test, time_diff.tv_sec == 0);
 	ast_test_validate(test, time_diff.tv_usec < 10000);
 
-	ao2_ref(uut, -1);
-	uut = NULL;
-	ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
+	ao2_ref(uut1, -1);
+	uut1 = NULL;
+	ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
+	ao2_ref(uut2, -1);
+	uut2 = NULL;
+	ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
 
 	return AST_TEST_PASS;
 }
@@ -643,11 +658,12 @@
 static void cache_test_data_dtor(void *obj)
 {
 	struct cache_test_data *data = obj;
+
 	ast_free(data->id);
 	ast_free(data->value);
 }
 
-static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
 {
 	RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
 
@@ -665,7 +681,12 @@
 		return NULL;
 	}
 
-	return stasis_message_create(type, data);
+	return stasis_message_create_full(type, data, eid);
+}
+
+static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+{
+	return cache_test_message_create_full(type, name, value, &ast_eid_default);
 }
 
 static const char *cache_test_data_id(struct stasis_message *message)
@@ -676,6 +697,81 @@
 		return NULL;
 	}
 	return cachable->id;
+}
+
+static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+	struct stasis_message *aggregate_snapshot;
+	struct stasis_message *snapshot;
+	struct stasis_message_type *type = NULL;
+	struct cache_test_data *test_data = NULL;
+	int idx;
+	int accumulated = 0;
+	char aggregate_str[30];
+
+	/* Accumulate the aggregate value. */
+	snapshot = stasis_cache_entry_get_local(entry);
+	if (snapshot) {
+		type = stasis_message_type(snapshot);
+		test_data = stasis_message_data(snapshot);
+		accumulated += atoi(test_data->value);
+	}
+	for (idx = 0; ; ++idx) {
+		snapshot = stasis_cache_entry_get_remote(entry, idx);
+		if (!snapshot) {
+			break;
+		}
+
+		type = stasis_message_type(snapshot);
+		test_data = stasis_message_data(snapshot);
+		accumulated += atoi(test_data->value);
+	}
+
+	if (!test_data) {
+		/* There are no test entries cached.  Delete the aggregate. */
+		return NULL;
+	}
+
+	snapshot = stasis_cache_entry_get_aggregate(entry);
+	if (snapshot) {
+		type = stasis_message_type(snapshot);
+		test_data = stasis_message_data(snapshot);
+		if (accumulated == atoi(test_data->value)) {
+			/* Aggregate test entry did not change. */
+			return ao2_bump(snapshot);
+		}
+	}
+
+	snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
+	aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
+	if (!aggregate_snapshot) {
+		/* Bummer.  We have to keep the old aggregate snapshot. */
+		ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
+		return ao2_bump(snapshot);
+	}
+
+	return aggregate_snapshot;
+}
+
+static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
+{
+	stasis_publish(topic, aggregate);
+}
+
+static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
+{
+	RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
+	struct cache_test_data *test_data;
+
+	aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
+	if (!aggregate) {
+		/* No aggregate, return true if given no value. */
+		return !value;
+	}
+
+	/* Return true if the given value matches the aggregate value. */
+	test_data = stasis_message_data(aggregate);
+	return value && !strcmp(value, test_data->value);
 }
 
 AST_TEST_DEFINE(cache_filter)
@@ -845,8 +941,8 @@
 	case TEST_INIT:
 		info->name = __func__;
 		info->category = test_category;
-		info->summary = "Test passing messages through cache topic unscathed.";
-		info->description = "Test passing messages through cache topic unscathed.";
+		info->summary = "Test cache dump routines.";
+		info->description = "Test cache dump routines.";
 		return AST_TEST_NOT_RUN;
 	case TEST_EXECUTE:
 		break;
@@ -933,6 +1029,266 @@
 	ao2_cleanup(cache_dump);
 	cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
 	ast_test_validate(test, 0 == ao2_container_count(cache_dump));
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache_eid_aggregate)
+{
+	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+	RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
+	RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
+	int actual_len;
+	struct ao2_iterator i;
+	void *obj;
+	struct ast_eid foreign_eid1;
+	struct ast_eid foreign_eid2;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test cache eid and aggregate support.";
+		info->description = "Test cache eid and aggregate support.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
+	memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
+
+	cache_type = stasis_message_type_create("Cacheable", NULL);
+	ast_test_validate(test, NULL != cache_type);
+
+	topic = stasis_topic_create("SomeTopic");
+	ast_test_validate(test, NULL != topic);
+
+	/* To consume events published to the topic. */
+	topic_consumer = consumer_create(1);
+	ast_test_validate(test, NULL != topic_consumer);
+
+	topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
+	ast_test_validate(test, NULL != topic_sub);
+	ao2_ref(topic_consumer, +1);
+
+	cache = stasis_cache_create_full(cache_test_data_id,
+		cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
+	ast_test_validate(test, NULL != cache);
+
+	caching_topic = stasis_caching_topic_create(topic, cache);
+	ast_test_validate(test, NULL != caching_topic);
+
+	/* To consume update events published to the caching_topic. */
+	cache_consumer = consumer_create(1);
+	ast_test_validate(test, NULL != cache_consumer);
+
+	cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
+	ast_test_validate(test, NULL != cache_sub);
+	ao2_ref(cache_consumer, +1);
+
+	/* Create test messages. */
+	test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
+	ast_test_validate(test, NULL != test_message1_1);
+	test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
+	ast_test_validate(test, NULL != test_message2_1);
+	test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
+	ast_test_validate(test, NULL != test_message2_2);
+	test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
+	ast_test_validate(test, NULL != test_message2_3);
+	test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
+	ast_test_validate(test, NULL != test_message2_4);
+
+	/* Post some snapshots */
+	stasis_publish(topic, test_message1_1);
+	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
+	stasis_publish(topic, test_message2_1);
+	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
+	stasis_publish(topic, test_message2_2);
+	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
+
+	actual_len = consumer_wait_for(cache_consumer, 6);
+	ast_test_validate(test, 6 == actual_len);
+	actual_len = consumer_wait_for(topic_consumer, 6);
+	ast_test_validate(test, 6 == actual_len);
+
+	/* Check the cache */
+	ao2_cleanup(cache_dump);
+	cache_dump = stasis_cache_dump_all(cache, NULL);
+	ast_test_validate(test, NULL != cache_dump);
+	ast_test_validate(test, 3 == ao2_container_count(cache_dump));
+	i = ao2_iterator_init(cache_dump, 0);
+	while ((obj = ao2_iterator_next(&i))) {
+		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+		ast_test_validate(test,
+			actual_cache_entry == test_message1_1
+			|| actual_cache_entry == test_message2_1
+			|| actual_cache_entry == test_message2_2);
+	}
+	ao2_iterator_destroy(&i);
+
+	/* Check the local cached items */
+	ao2_cleanup(cache_dump);
+	cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
+	ast_test_validate(test, NULL != cache_dump);
+	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+	i = ao2_iterator_init(cache_dump, 0);
+	while ((obj = ao2_iterator_next(&i))) {
+		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+		ast_test_validate(test,
+			actual_cache_entry == test_message1_1
+			|| actual_cache_entry == test_message2_1);
+	}
+	ao2_iterator_destroy(&i);
+
+	/* Post snapshot 2 from another eid. */
+	stasis_publish(topic, test_message2_3);
+	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
+
+	actual_len = consumer_wait_for(cache_consumer, 8);
+	ast_test_validate(test, 8 == actual_len);
+	actual_len = consumer_wait_for(topic_consumer, 8);
+	ast_test_validate(test, 8 == actual_len);
+
+	/* Check the cache */
+	ao2_cleanup(cache_dump);
+	cache_dump = stasis_cache_dump_all(cache, NULL);
+	ast_test_validate(test, NULL != cache_dump);
+	ast_test_validate(test, 4 == ao2_container_count(cache_dump));
+	i = ao2_iterator_init(cache_dump, 0);
+	while ((obj = ao2_iterator_next(&i))) {
+		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+		ast_test_validate(test,
+			actual_cache_entry == test_message1_1
+			|| actual_cache_entry == test_message2_1
+			|| actual_cache_entry == test_message2_2
+			|| actual_cache_entry == test_message2_3);
+	}
+	ao2_iterator_destroy(&i);
+
+	/* Check the remote cached items */
+	ao2_cleanup(cache_dump);
+	cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
+	ast_test_validate(test, NULL != cache_dump);
+	ast_test_validate(test, 1 == ao2_container_count(cache_dump));
+	i = ao2_iterator_init(cache_dump, 0);
+	while ((obj = ao2_iterator_next(&i))) {

[... 120 lines stripped ...]



More information about the asterisk-commits mailing list