[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r409698 - in /team/rmudgett/stasis_cac...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Mar 4 20:00:03 CST 2014
Author: rmudgett
Date: Tue Mar 4 19:59:50 2014
New Revision: 409698
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=409698
Log:
Address review feedback.
* Renamed internal/external cache entries to local/remote respectively.
stasis_cache_entry_get_internal() -> stasis_cache_entry_get_local()
stasis_cache_entry_get_external() -> stasis_cache_entry_get_remote()
* Renamed typedef cache_aggregate_post_fn to cache_aggregate_publish_fn
and added a topic parameter.
* Added note defining what is an aggregate message for the stasis cache
with a couple examples.
* Split stasis_cache_dump_full() into stasis_cache_dump_by_eid() and
stasis_cache_dump_all() to be inline with the stasis_cache_get functions.
* Straightened out devicestate.c subscription for non-cached events.
* Updated the message stasis unit test to test
stasis_message_create_full() and stasis_message_eid().
* Added cache_eid_aggregate stasis unit test to test the external entity
and aggregate caching support.
Modified:
team/rmudgett/stasis_cache/include/asterisk/stasis.h
team/rmudgett/stasis_cache/main/devicestate.c
team/rmudgett/stasis_cache/main/stasis_cache.c
team/rmudgett/stasis_cache/tests/test_devicestate.c
team/rmudgett/stasis_cache/tests/test_stasis.c
Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Tue Mar 4 19:59:50 2014
@@ -309,8 +309,15 @@
* \param data Immutable data that is the actual contents of the message
* \param eid What entity originated this message. (NULL for aggregate)
*
- * \return New message
- * \return \c NULL on error
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
+ * \retval New message
+ * \retval \c NULL on error
*
* \since 12.2.0
*/
@@ -704,20 +711,43 @@
* \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
* if a new aggregate could not be calculated because of error.
*
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
* \return New aggregate-snapshot calculated on success.
* Caller has a reference on return.
*/
typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
/*!
- * \brief Callback to post the aggregate cache entry message.
+ * \brief Callback to publish the aggregate cache entry message.
* \since 12.2.0
*
- * \param aggregate The aggregate shapshot message to post.
+ * \details
+ * Once an aggregate message is calculated. This callback publishes the
+ * message so subscribers will know the new value of an aggregated state.
+ *
+ * \param topic The aggregate message may be published to this topic.
+ * It is the topic to which the cache itself is subscribed.
+ * \param aggregate The aggregate shapshot message to publish.
+ *
+ * \note It is up to the function to determine if there is a better topic
+ * the aggregate message should be published over.
+ *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
*
* \return Nothing
*/
-typedef void (*cache_aggregate_post_fn)(struct stasis_message *aggregate);
+typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
/*!
* \brief Get the aggregate cache entry snapshot.
@@ -727,37 +757,44 @@
*
* \note A reference is not given to the returned pointer so don't unref it.
*
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
* \retval Aggregate-snapshot in cache.
* \retval NULL if not present.
*/
struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
/*!
- * \brief Get the internal cache entry snapshot.
+ * \brief Get the local entity's cache entry snapshot.
* \since 12.2.0
*
- * \param entry Cache entry to get the internal snapshot.
+ * \param entry Cache entry to get the local entity's snapshot.
*
* \note A reference is not given to the returned pointer so don't unref it.
*
* \retval Internal-snapshot in cache.
* \retval NULL if not present.
*/
-struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry);
-
-/*!
- * \brief Get the external cache entry snapshot by index.
+struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get a remote entity's cache entry snapshot by index.
* \since 12.2.0
*
- * \param entry Cache entry to get the external snapshot.
- * \param idx Which external snapshot to get.
+ * \param entry Cache entry to get a remote entity's snapshot.
+ * \param idx Which remote entity's snapshot to get.
*
* \note A reference is not given to the returned pointer so don't unref it.
*
- * \retval External-snapshot in cache.
+ * \retval Remote-entity-snapshot in cache.
* \retval NULL if not present.
*/
-struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx);
+struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
/*!
* \brief Create a cache.
@@ -786,14 +823,21 @@
*
* \param id_fn Callback to extract the id from a snapshot message.
* \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
- * \param aggregate_post_fn Callback to post the aggregate cache entry.
+ * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
+ *
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
*
* \retval New cache indexed by \a id_fn.
* \retval \c NULL on error
*
* \since 12.2.0
*/
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn);
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
/*!
* \brief Create a topic which monitors and caches messages from another topic.
@@ -894,6 +938,13 @@
* \param id Identity of the snapshot to retrieve.
* \param eid Specific entity id to retrieve. NULL for aggregate.
*
+ * \note An aggregate message is a combined representation of the local
+ * and remote entities publishing the message data. e.g., An aggregate
+ * device state represents the combined device state from the local and
+ * any remote entities publishing state for a device. e.g., An aggregate
+ * MWI message is the old/new MWI counts accumulated from the local and
+ * any remote entities publishing to a mailbox.
+ *
* \retval Message from the cache.
* \retval \c NULL if message is not found.
*
@@ -902,7 +953,7 @@
struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
/*!
- * \brief Retrieve all matching items from the cache.
+ * \brief Retrieve all matching entity items from the cache.
* \since 12.2.0
*
* \param cache The cache to query.
@@ -933,12 +984,24 @@
*
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
- * \param eid Specific entity id to retrieve. NULL if any entities.
+ * \param eid Specific entity id to retrieve. NULL for aggregate.
*
* \retval ao2_container containing all matches (must be unreffed by caller)
* \retval \c NULL on allocation error
*/
-struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
+struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
+
+/*!
+ * \brief Dump all entity items from the cache to a subscription.
+ * \since 12.2.0
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to dump (any type if \c NULL).
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ */
+struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
/*! @} */
Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Tue Mar 4 19:59:50 2014
@@ -570,15 +570,6 @@
{
struct ast_device_state_message *device_state;
- if (stasis_cache_update_type() == stasis_message_type(msg)) {
- struct stasis_cache_update *update = stasis_message_data(msg);
-
- if (!update->new_snapshot) {
- return;
- }
- msg = update->new_snapshot;
- }
-
if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
@@ -738,14 +729,15 @@
/*!
* \internal
- * \brief Callback to post the aggregate device state cache entry message.
+ * \brief Callback to publish the aggregate device state cache entry message.
* \since 12.2.0
*
- * \param aggregate The aggregate shapshot message to post.
+ * \param cache_topic Caching topic the aggregate message may be published over.
+ * \param aggregate The aggregate shapshot message to publish.
*
* \return Nothing
*/
-static void device_state_aggregate_post(struct stasis_message *aggregate)
+static void device_state_aggregate_publish(struct stasis_topic *cache_topic, struct stasis_message *aggregate)
{
const char *device;
struct stasis_topic *device_specific_topic;
@@ -787,14 +779,14 @@
/* Determine the new aggregate device state. */
ast_devstate_aggregate_init(&aggregate);
- snapshot = stasis_cache_entry_get_internal(entry);
+ snapshot = stasis_cache_entry_get_local(entry);
if (snapshot) {
device_state = stasis_message_data(snapshot);
device = device_state->device;
ast_devstate_aggregate_add(&aggregate, device_state->state);
}
for (idx = 0; ; ++idx) {
- snapshot = stasis_cache_entry_get_external(entry, idx);
+ snapshot = stasis_cache_entry_get_remote(entry, idx);
if (!snapshot) {
break;
}
@@ -870,22 +862,22 @@
return -1;
}
device_state_cache = stasis_cache_create_full(device_state_get_id,
- device_state_aggregate_calc, device_state_aggregate_post);
+ device_state_aggregate_calc, device_state_aggregate_publish);
if (!device_state_cache) {
devstate_cleanup();
return -1;
}
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all,
+ device_state_topic_cached = stasis_caching_topic_create(ast_device_state_topic_all(),
device_state_cache);
if (!device_state_topic_cached) {
devstate_cleanup();
return -1;
}
- devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(),
+ devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(),
devstate_change_cb, NULL);
if (!devstate_message_sub) {
- ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+ ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n");
devstate_cleanup();
return -1;
}
Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Tue Mar 4 19:59:50 2014
@@ -49,7 +49,7 @@
struct ao2_container *entries;
snapshot_get_id id_fn;
cache_aggregate_calc_fn aggregate_calc_fn;
- cache_aggregate_post_fn aggregate_post_fn;
+ cache_aggregate_publish_fn aggregate_publish_fn;
};
/*! \internal */
@@ -136,10 +136,10 @@
struct cache_entry_key key;
/*! Aggregate snapshot of the stasis cache. */
struct stasis_message *aggregate;
- /*! Internal snapshot of the stasis event. */
- struct stasis_message *internal;
- /*! External snapshots of the stasis event. */
- AST_VECTOR(, struct stasis_message *) external;
+ /*! Local entity snapshot of the stasis event. */
+ struct stasis_message *local;
+ /*! Remote entity snapshots of the stasis event. */
+ AST_VECTOR(, struct stasis_message *) remote;
};
static void cache_entry_dtor(void *obj)
@@ -154,22 +154,22 @@
ao2_cleanup(entry->aggregate);
entry->aggregate = NULL;
- ao2_cleanup(entry->internal);
- entry->internal = NULL;
-
- for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
- struct stasis_message *external;
-
- external = AST_VECTOR_GET(&entry->external, idx);
- ao2_cleanup(external);
- }
- AST_VECTOR_FREE(&entry->external);
+ ao2_cleanup(entry->local);
+ entry->local = NULL;
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
+ struct stasis_message *remote;
+
+ remote = AST_VECTOR_GET(&entry->remote, idx);
+ ao2_cleanup(remote);
+ }
+ AST_VECTOR_FREE(&entry->remote);
}
static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
{
struct stasis_cache_entry *entry;
- int is_external;
+ int is_remote;
ast_assert(type != NULL);
ast_assert(id != NULL);
@@ -188,19 +188,19 @@
}
entry->key.type = ao2_bump(type);
- is_external = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
- if (AST_VECTOR_INIT(&entry->external, is_external)) {
+ is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
+ if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
ao2_cleanup(entry);
return NULL;
}
- if (is_external) {
- if (AST_VECTOR_APPEND(&entry->external, snapshot)) {
+ if (is_remote) {
+ if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
ao2_cleanup(entry);
return NULL;
}
} else {
- entry->internal = snapshot;
+ entry->local = snapshot;
}
ao2_bump(snapshot);
@@ -279,7 +279,8 @@
}
struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
- cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn)
+ cache_aggregate_calc_fn aggregate_calc_fn,
+ cache_aggregate_publish_fn aggregate_publish_fn)
{
struct stasis_cache *cache;
@@ -298,7 +299,7 @@
cache->id_fn = id_fn;
cache->aggregate_calc_fn = aggregate_calc_fn;
- cache->aggregate_post_fn = aggregate_post_fn;
+ cache->aggregate_publish_fn = aggregate_publish_fn;
return cache;
}
@@ -313,15 +314,15 @@
return entry->aggregate;
}
-struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry)
-{
- return entry->internal;
-}
-
-struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx)
-{
- if (idx < AST_VECTOR_SIZE(&entry->external)) {
- return AST_VECTOR_GET(&entry->external, idx);
+struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
+{
+ return entry->local;
+}
+
+struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
+{
+ if (idx < AST_VECTOR_SIZE(&entry->remote)) {
+ return AST_VECTOR_GET(&entry->remote, idx);
}
return NULL;
}
@@ -363,28 +364,28 @@
static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
{
struct stasis_message *old_snapshot;
- int is_external;
-
- is_external = ast_eid_cmp(eid, &ast_eid_default);
- if (!is_external) {
- old_snapshot = cached_entry->internal;
- cached_entry->internal = NULL;
+ int is_remote;
+
+ is_remote = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_remote) {
+ old_snapshot = cached_entry->local;
+ cached_entry->local = NULL;
} else {
int idx;
old_snapshot = NULL;
- for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+ for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
struct stasis_message *cur;
- cur = AST_VECTOR_GET(&cached_entry->external, idx);
+ cur = AST_VECTOR_GET(&cached_entry->remote, idx);
if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
- old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+ old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
break;
}
}
}
- if (!cached_entry->internal && !AST_VECTOR_SIZE(&cached_entry->external)) {
+ if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
}
@@ -404,27 +405,27 @@
static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
{
struct stasis_message *old_snapshot;
- int is_external;
+ int is_remote;
int idx;
- is_external = ast_eid_cmp(eid, &ast_eid_default);
- if (!is_external) {
- old_snapshot = cached_entry->internal;
- cached_entry->internal = ao2_bump(new_snapshot);
+ is_remote = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_remote) {
+ old_snapshot = cached_entry->local;
+ cached_entry->local = ao2_bump(new_snapshot);
return old_snapshot;
}
old_snapshot = NULL;
- for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+ for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
struct stasis_message *cur;
- cur = AST_VECTOR_GET(&cached_entry->external, idx);
+ cur = AST_VECTOR_GET(&cached_entry->remote, idx);
if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
- old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+ old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
break;
}
}
- if (!AST_VECTOR_APPEND(&cached_entry->external, new_snapshot)) {
+ if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
ao2_bump(new_snapshot);
}
@@ -496,7 +497,7 @@
* \param entry Cache entry to use.
*
* \retval 0 on success.
- * \retval -1 on error.
+ * \retval non-zero on error.
*/
static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
{
@@ -508,14 +509,14 @@
/* The aggregate snapshot is not a snapshot from an entity. */
- if (entry->internal) {
- err |= !ao2_link(snapshots, entry->internal);
- }
-
- for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ if (entry->local) {
+ err |= !ao2_link(snapshots, entry->local);
+ }
+
+ for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
struct stasis_message *snapshot;
- snapshot = AST_VECTOR_GET(&entry->external, idx);
+ snapshot = AST_VECTOR_GET(&entry->remote, idx);
err |= !ao2_link(snapshots, snapshot);
}
@@ -565,7 +566,7 @@
*/
static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
{
- int is_external;
+ int is_remote;
int idx;
if (!eid) {
@@ -574,15 +575,15 @@
}
/* Get snapshot with specific eid. */
- is_external = ast_eid_cmp(eid, &ast_eid_default);
- if (!is_external) {
- return entry->internal;
- }
-
- for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ is_remote = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_remote) {
+ return entry->local;
+ }
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
struct stasis_message *cur;
- cur = AST_VECTOR_GET(&entry->external, idx);
+ cur = AST_VECTOR_GET(&entry->remote, idx);
if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
return cur;
}
@@ -621,33 +622,33 @@
}
struct cache_dump_data {
- struct ao2_container *cached;
+ struct ao2_container *container;
struct stasis_message_type *type;
const struct ast_eid *eid;
};
-static int cache_dump_cb(void *obj, void *arg, int flags)
+static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
{
struct cache_dump_data *cache_dump = arg;
struct stasis_cache_entry *entry = obj;
if (!cache_dump->type || entry->key.type == cache_dump->type) {
- if (cache_dump->eid) {
- struct stasis_message *snapshot;
-
- snapshot = cache_entry_by_eid(entry, cache_dump->eid);
- if (snapshot) {
- ao2_link(cache_dump->cached, snapshot);
+ struct stasis_message *snapshot;
+
+ snapshot = cache_entry_by_eid(entry, cache_dump->eid);
+ if (snapshot) {
+ if (!ao2_link(cache_dump->container, snapshot)) {
+ ao2_cleanup(cache_dump->container);
+ cache_dump->container = NULL;
+ return CMP_STOP;
}
- } else {
- cache_entry_dump(cache_dump->cached, entry);
}
}
return 0;
}
-struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
+struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
{
struct cache_dump_data cache_dump;
@@ -656,18 +657,52 @@
cache_dump.eid = eid;
cache_dump.type = type;
- cache_dump.cached = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
- if (!cache_dump.cached) {
- return NULL;
- }
-
- ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
- return cache_dump.cached;
+ cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+ if (!cache_dump.container) {
+ return NULL;
+ }
+
+ ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
+ return cache_dump.container;
}
struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
{
- return stasis_cache_dump_full(cache, type, &ast_eid_default);
+ return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
+}
+
+static int cache_dump_all_cb(void *obj, void *arg, int flags)
+{
+ struct cache_dump_data *cache_dump = arg;
+ struct stasis_cache_entry *entry = obj;
+
+ if (!cache_dump->type || entry->key.type == cache_dump->type) {
+ if (cache_entry_dump(cache_dump->container, entry)) {
+ ao2_cleanup(cache_dump->container);
+ cache_dump->container = NULL;
+ return CMP_STOP;
+ }
+ }
+
+ return 0;
+}
+
+struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
+{
+ struct cache_dump_data cache_dump;
+
+ ast_assert(cache != NULL);
+ ast_assert(cache->entries != NULL);
+
+ cache_dump.eid = NULL;
+ cache_dump.type = type;
+ cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+ if (!cache_dump.container) {
+ return NULL;
+ }
+
+ ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
+ return cache_dump.container;
}
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
@@ -781,8 +816,9 @@
}
if (snapshots.aggregate_old != snapshots.aggregate_new) {
- if (snapshots.aggregate_new && caching_topic->cache->aggregate_post_fn) {
- caching_topic->cache->aggregate_post_fn(snapshots.aggregate_new);
+ if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
+ caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
+ snapshots.aggregate_new);
}
update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
if (update) {
Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Tue Mar 4 19:59:50 2014
@@ -403,7 +403,7 @@
struct ao2_container *cache_dump;
/* remove all device states created during this test */
- cache_dump = stasis_cache_dump_full(ast_device_state_cache(), NULL, NULL);
+ cache_dump = stasis_cache_dump_all(ast_device_state_cache(), NULL);
if (!cache_dump) {
return;
}
Modified: team/rmudgett/stasis_cache/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_stasis.c?view=diff&rev=409698&r1=409697&r2=409698
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_stasis.c (original)
+++ team/rmudgett/stasis_cache/tests/test_stasis.c Tue Mar 4 19:59:50 2014
@@ -94,11 +94,13 @@
AST_TEST_DEFINE(message)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
RAII_VAR(char *, data, NULL, ao2_cleanup);
char *expected = "SomeData";
struct timeval expected_timestamp;
struct timeval time_diff;
+ struct ast_eid foreign_eid;
switch (cmd) {
case TEST_INIT:
@@ -112,29 +114,42 @@
}
+ memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
+
type = stasis_message_type_create("SomeMessage", NULL);
- ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
- ast_test_validate(test, NULL == stasis_message_create(type, NULL));
+ ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
+ ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
data = ao2_alloc(strlen(expected) + 1, NULL);
- strcpy(data, expected);
+ strcpy(data, expected);/* Safe */
expected_timestamp = ast_tvnow();
- uut = stasis_message_create(type, data);
-
- ast_test_validate(test, NULL != uut);
- ast_test_validate(test, type == stasis_message_type(uut));
- ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
- ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
-
- time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
+ uut1 = stasis_message_create_full(type, data, &foreign_eid);
+ uut2 = stasis_message_create_full(type, data, NULL);
+
+ ast_test_validate(test, NULL != uut1);
+ ast_test_validate(test, NULL != uut2);
+ ast_test_validate(test, type == stasis_message_type(uut1));
+ ast_test_validate(test, type == stasis_message_type(uut2));
+ ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
+ ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
+ ast_test_validate(test, NULL != stasis_message_eid(uut1));
+ ast_test_validate(test, NULL == stasis_message_eid(uut2));
+ ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
+
+ ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
+
+ time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
/* 10ms is certainly long enough for the two calls to complete */
ast_test_validate(test, time_diff.tv_sec == 0);
ast_test_validate(test, time_diff.tv_usec < 10000);
- ao2_ref(uut, -1);
- uut = NULL;
- ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
+ ao2_ref(uut1, -1);
+ uut1 = NULL;
+ ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
+ ao2_ref(uut2, -1);
+ uut2 = NULL;
+ ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
return AST_TEST_PASS;
}
@@ -643,11 +658,12 @@
static void cache_test_data_dtor(void *obj)
{
struct cache_test_data *data = obj;
+
ast_free(data->id);
ast_free(data->value);
}
-static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
{
RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
@@ -665,7 +681,12 @@
return NULL;
}
- return stasis_message_create(type, data);
+ return stasis_message_create_full(type, data, eid);
+}
+
+static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
+{
+ return cache_test_message_create_full(type, name, value, &ast_eid_default);
}
static const char *cache_test_data_id(struct stasis_message *message)
@@ -676,6 +697,81 @@
return NULL;
}
return cachable->id;
+}
+
+static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+ struct stasis_message *aggregate_snapshot;
+ struct stasis_message *snapshot;
+ struct stasis_message_type *type = NULL;
+ struct cache_test_data *test_data = NULL;
+ int idx;
+ int accumulated = 0;
+ char aggregate_str[30];
+
+ /* Accumulate the aggregate value. */
+ snapshot = stasis_cache_entry_get_local(entry);
+ if (snapshot) {
+ type = stasis_message_type(snapshot);
+ test_data = stasis_message_data(snapshot);
+ accumulated += atoi(test_data->value);
+ }
+ for (idx = 0; ; ++idx) {
+ snapshot = stasis_cache_entry_get_remote(entry, idx);
+ if (!snapshot) {
+ break;
+ }
+
+ type = stasis_message_type(snapshot);
+ test_data = stasis_message_data(snapshot);
+ accumulated += atoi(test_data->value);
+ }
+
+ if (!test_data) {
+ /* There are no test entries cached. Delete the aggregate. */
+ return NULL;
+ }
+
+ snapshot = stasis_cache_entry_get_aggregate(entry);
+ if (snapshot) {
+ type = stasis_message_type(snapshot);
+ test_data = stasis_message_data(snapshot);
+ if (accumulated == atoi(test_data->value)) {
+ /* Aggregate test entry did not change. */
+ return ao2_bump(snapshot);
+ }
+ }
+
+ snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
+ aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
+ if (!aggregate_snapshot) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
+ return ao2_bump(snapshot);
+ }
+
+ return aggregate_snapshot;
+}
+
+static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
+{
+ stasis_publish(topic, aggregate);
+}
+
+static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
+{
+ RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
+ struct cache_test_data *test_data;
+
+ aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
+ if (!aggregate) {
+ /* No aggregate, return true if given no value. */
+ return !value;
+ }
+
+ /* Return true if the given value matches the aggregate value. */
+ test_data = stasis_message_data(aggregate);
+ return value && !strcmp(value, test_data->value);
}
AST_TEST_DEFINE(cache_filter)
@@ -845,8 +941,8 @@
case TEST_INIT:
info->name = __func__;
info->category = test_category;
- info->summary = "Test passing messages through cache topic unscathed.";
- info->description = "Test passing messages through cache topic unscathed.";
+ info->summary = "Test cache dump routines.";
+ info->description = "Test cache dump routines.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
@@ -933,6 +1029,266 @@
ao2_cleanup(cache_dump);
cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
ast_test_validate(test, 0 == ao2_container_count(cache_dump));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache_eid_aggregate)
+{
+ RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
+ int actual_len;
+ struct ao2_iterator i;
+ void *obj;
+ struct ast_eid foreign_eid1;
+ struct ast_eid foreign_eid2;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test cache eid and aggregate support.";
+ info->description = "Test cache eid and aggregate support.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
+ memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
+
+ cache_type = stasis_message_type_create("Cacheable", NULL);
+ ast_test_validate(test, NULL != cache_type);
+
+ topic = stasis_topic_create("SomeTopic");
+ ast_test_validate(test, NULL != topic);
+
+ /* To consume events published to the topic. */
+ topic_consumer = consumer_create(1);
+ ast_test_validate(test, NULL != topic_consumer);
+
+ topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
+ ast_test_validate(test, NULL != topic_sub);
+ ao2_ref(topic_consumer, +1);
+
+ cache = stasis_cache_create_full(cache_test_data_id,
+ cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
+ ast_test_validate(test, NULL != cache);
+
+ caching_topic = stasis_caching_topic_create(topic, cache);
+ ast_test_validate(test, NULL != caching_topic);
+
+ /* To consume update events published to the caching_topic. */
+ cache_consumer = consumer_create(1);
+ ast_test_validate(test, NULL != cache_consumer);
+
+ cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
+ ast_test_validate(test, NULL != cache_sub);
+ ao2_ref(cache_consumer, +1);
+
+ /* Create test messages. */
+ test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
+ ast_test_validate(test, NULL != test_message1_1);
+ test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
+ ast_test_validate(test, NULL != test_message2_1);
+ test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
+ ast_test_validate(test, NULL != test_message2_2);
+ test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
+ ast_test_validate(test, NULL != test_message2_3);
+ test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
+ ast_test_validate(test, NULL != test_message2_4);
+
+ /* Post some snapshots */
+ stasis_publish(topic, test_message1_1);
+ ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
+ stasis_publish(topic, test_message2_1);
+ ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
+ stasis_publish(topic, test_message2_2);
+ ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
+
+ actual_len = consumer_wait_for(cache_consumer, 6);
+ ast_test_validate(test, 6 == actual_len);
+ actual_len = consumer_wait_for(topic_consumer, 6);
+ ast_test_validate(test, 6 == actual_len);
+
+ /* Check the cache */
+ ao2_cleanup(cache_dump);
+ cache_dump = stasis_cache_dump_all(cache, NULL);
+ ast_test_validate(test, NULL != cache_dump);
+ ast_test_validate(test, 3 == ao2_container_count(cache_dump));
+ i = ao2_iterator_init(cache_dump, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+ ast_test_validate(test,
+ actual_cache_entry == test_message1_1
+ || actual_cache_entry == test_message2_1
+ || actual_cache_entry == test_message2_2);
+ }
+ ao2_iterator_destroy(&i);
+
+ /* Check the local cached items */
+ ao2_cleanup(cache_dump);
+ cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
+ ast_test_validate(test, NULL != cache_dump);
+ ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+ i = ao2_iterator_init(cache_dump, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+ ast_test_validate(test,
+ actual_cache_entry == test_message1_1
+ || actual_cache_entry == test_message2_1);
+ }
+ ao2_iterator_destroy(&i);
+
+ /* Post snapshot 2 from another eid. */
+ stasis_publish(topic, test_message2_3);
+ ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
+
+ actual_len = consumer_wait_for(cache_consumer, 8);
+ ast_test_validate(test, 8 == actual_len);
+ actual_len = consumer_wait_for(topic_consumer, 8);
+ ast_test_validate(test, 8 == actual_len);
+
+ /* Check the cache */
+ ao2_cleanup(cache_dump);
+ cache_dump = stasis_cache_dump_all(cache, NULL);
+ ast_test_validate(test, NULL != cache_dump);
+ ast_test_validate(test, 4 == ao2_container_count(cache_dump));
+ i = ao2_iterator_init(cache_dump, 0);
+ while ((obj = ao2_iterator_next(&i))) {
+ RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
+
+ ast_test_validate(test,
+ actual_cache_entry == test_message1_1
+ || actual_cache_entry == test_message2_1
+ || actual_cache_entry == test_message2_2
+ || actual_cache_entry == test_message2_3);
+ }
+ ao2_iterator_destroy(&i);
+
+ /* Check the remote cached items */
+ ao2_cleanup(cache_dump);
+ cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
+ ast_test_validate(test, NULL != cache_dump);
+ ast_test_validate(test, 1 == ao2_container_count(cache_dump));
+ i = ao2_iterator_init(cache_dump, 0);
+ while ((obj = ao2_iterator_next(&i))) {
[... 120 lines stripped ...]
More information about the asterisk-commits
mailing list