[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r408837 - in /team/rmudgett/stasis_cac...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Feb 21 22:18:58 CST 2014
Author: rmudgett
Date: Fri Feb 21 22:18:53 2014
New Revision: 408837
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408837
Log:
stasis: Add eid support to stasis messaging and cache.
* Convert all stasis messages to have an eid to identify its source.
* Make stasis cache store items with the same id but from different source
eids in the same cache entry.
Note: The device state and mwi state events are published to stasis with
the eid source and consumers need to be updated to get an aggregated state
for a specific device and mwi. The device_state_aggregation_test unit
test currently fails as a result.
Modified:
team/rmudgett/stasis_cache/include/asterisk/stasis.h
team/rmudgett/stasis_cache/main/app.c
team/rmudgett/stasis_cache/main/devicestate.c
team/rmudgett/stasis_cache/main/stasis_cache.c
team/rmudgett/stasis_cache/main/stasis_message.c
team/rmudgett/stasis_cache/tests/test_devicestate.c
Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Fri Feb 21 22:18:53 2014
@@ -290,11 +290,39 @@
*
* \param type Type of the message
* \param data Immutable data that is the actual contents of the message
+ *
* \return New message
* \return \c NULL on error
+ *
* \since 12
*/
struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
+
+/*!
+ * \brief Create a new message for an entity.
+ *
+ * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
+ * with it. Messages are also immutable, and must not be modified after they
+ * are initialized. Especially the \a data in the message.
+ *
+ * \param type Type of the message
+ * \param data Immutable data that is the actual contents of the message
+ * \param eid What entity originated this message.
+ *
+ * \return New message
+ * \return \c NULL on error
+ */
+struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
+
+/*!
+ * \brief Get the entity id for a \ref stasis_message.
+ *
+ * \param msg Message to get eid.
+ *
+ * \return Entity id of \a msg
+ * \return \c NULL if \a msg is \c NULL.
+ */
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
/*!
* \brief Get the message type for a \ref stasis_message.
@@ -749,31 +777,72 @@
struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
/*!
- * \brief Retrieve an item from the cache.
+ * \brief Retrieve an item from the cache for the ast_eid_default entity.
*
* The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
*
* \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
- * \return Message from the cache.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get(
- struct stasis_cache *cache, struct stasis_message_type *type,
- const char *id);
-
-/*!
- * \brief Dump cached items to a subscription
+ *
+ * \retval Message from the cache.
+ * \retval \c NULL if message is not found.
+ *
+ * \since 12
+ */
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Retrieve an item from the cache for a specific entity.
+ *
+ * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ * \param eid Specific entity id to retrieve. NULL if any entity, but prefer internal.
+ *
+ * \retval Message from the cache.
+ * \retval \c NULL if message is not found.
+ */
+struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
+
+/*!
+ * \brief Retrieve all matching items from the cache.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to retrieve.
+ * \param id Identity of the snapshot to retrieve.
+ *
+ * \retval Container of matching items found.
+ * \retval \c NULL if error.
+ */
+struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
+
+/*!
+ * \brief Dump cached items to a subscription for the ast_eid_default entity.
+ *
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
- * \return ao2_container containing all matches (must be unreffed by caller)
- * \return \c NULL on allocation error
- * \since 12
- */
-struct ao2_container *stasis_cache_dump(struct stasis_cache *cache,
- struct stasis_message_type *type);
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ *
+ * \since 12
+ */
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
+
+/*!
+ * \brief Dump cached items to a subscription for a specific entity.
+ *
+ * \param cache The cache to query.
+ * \param type Type of message to dump (any type if \c NULL).
+ * \param eid Specific entity id to retrieve. NULL if any entities.
+ *
+ * \retval ao2_container containing all matches (must be unreffed by caller)
+ * \retval \c NULL on allocation error
+ */
+struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
/*! @} */
Modified: team/rmudgett/stasis_cache/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/app.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/app.c (original)
+++ team/rmudgett/stasis_cache/main/app.c Fri Feb 21 22:18:53 2014
@@ -2821,7 +2821,7 @@
return mwi_state;
}
-
+/* BUGBUG mwi_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
int ast_publish_mwi_state_full(
const char *mailbox,
const char *context,
@@ -2857,10 +2857,13 @@
if (eid) {
mwi_state->eid = *eid;
} else {
- ast_set_default_eid(&mwi_state->eid);
- }
-
- message = stasis_message_create(ast_mwi_state_type(), mwi_state);
+ mwi_state->eid = ast_eid_default;
+ }
+
+ message = stasis_message_create_full(ast_mwi_state_type(), mwi_state, &mwi_state->eid);
+ if (!message) {
+ return -1;
+ }
mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
if (!mailbox_specific_topic) {
@@ -2911,7 +2914,7 @@
ao2_ref(obj->mwi_state, +1);
obj->blob = ast_json_ref(blob);
- msg = stasis_message_create(message_type, obj);
+ msg = stasis_message_create_full(message_type, obj, &mwi_state->eid);
if (!msg) {
return NULL;
}
Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Fri Feb 21 22:18:53 2014
@@ -750,6 +750,7 @@
return 0;
}
+/* BUGBUG device_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
int ast_publish_device_state_full(
const char *device,
enum ast_device_state state,
@@ -767,7 +768,11 @@
return -1;
}
- message = stasis_message_create(ast_device_state_message_type(), device_state);
+ message = stasis_message_create_full(ast_device_state_message_type(), device_state,
+ eid ?: &ast_eid_default);
+ if (!message) {
+ return -1;
+ }
device_specific_topic = ast_device_state_topic(device);
if (!device_specific_topic) {
Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Fri Feb 21 22:18:53 2014
@@ -36,6 +36,7 @@
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/utils.h"
+#include "asterisk/vector.h"
#ifdef LOW_MEMORY
#define NUM_CACHE_BUCKETS 17
@@ -131,27 +132,42 @@
struct cache_entry {
struct cache_entry_key key;
- struct stasis_message *snapshot;
+ /*! Internal snapshot of the stasis event. */
+ struct stasis_message *internal;
+ /*! External snapshots of the stasis event. */
+ AST_VECTOR(, struct stasis_message *) external;
};
static void cache_entry_dtor(void *obj)
{
struct cache_entry *entry = obj;
+ size_t idx;
ao2_cleanup(entry->key.type);
entry->key.type = NULL;
ast_free((char *) entry->key.id);
entry->key.id = NULL;
- ao2_cleanup(entry->snapshot);
- entry->snapshot = NULL;
+
+ ao2_cleanup(entry->internal);
+ entry->internal = NULL;
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ struct stasis_message *external;
+
+ external = AST_VECTOR_GET(&entry->external, idx);
+ ao2_cleanup(external);
+ }
+ AST_VECTOR_FREE(&entry->external);
}
static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
{
struct cache_entry *entry;
+ int is_external;
ast_assert(type != NULL);
ast_assert(id != NULL);
+ ast_assert(snapshot != NULL);
entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
@@ -166,7 +182,21 @@
}
entry->key.type = ao2_bump(type);
- entry->snapshot = ao2_bump(snapshot);
+ is_external = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
+ if (AST_VECTOR_INIT(&entry->external, is_external)) {
+ ao2_cleanup(entry);
+ return NULL;
+ }
+
+ if (is_external) {
+ if (AST_VECTOR_APPEND(&entry->external, snapshot)) {
+ ao2_cleanup(entry);
+ return NULL;
+ }
+ } else {
+ entry->internal = snapshot;
+ }
+ ao2_bump(snapshot);
return entry;
}
@@ -264,8 +294,91 @@
return cache;
}
+/*!
+ * \internal
+ * \brief Remove the stasis snapshot in the cache entry determined by eid.
+ *
+ * \param entries Container of cached entries.
+ * \param cached_entry The entry to remove the snapshot from.
+ * \param eid Which snapshot in the cached entry.
+ *
+ * \note The entries container is already locked.
+ *
+ * \return Previous stasis entry snapshot.
+ */
+static struct stasis_message *cache_remove(struct ao2_container *entries, struct cache_entry *cached_entry, const struct ast_eid *eid)
+{
+ struct stasis_message *old_snapshot;
+ int is_external;
+
+ is_external = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_external) {
+ old_snapshot = cached_entry->internal;
+ cached_entry->internal = NULL;
+ } else {
+ int idx;
+
+ old_snapshot = NULL;
+ for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+ struct stasis_message *cur;
+
+ cur = AST_VECTOR_GET(&cached_entry->external, idx);
+ if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+ old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+ break;
+ }
+ }
+ }
+
+ if (!cached_entry->internal && !AST_VECTOR_SIZE(&cached_entry->external)) {
+ ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
+ }
+
+ return old_snapshot;
+}
+
+/*!
+ * \internal
+ * \brief Update the stasis snapshot in the cache entry determined by eid.
+ *
+ * \param cached_entry The entry to remove the snapshot from.
+ * \param eid Which snapshot in the cached entry.
+ * \param new_snapshot Snapshot to replace the old snapshot.
+ *
+ * \return Previous stasis entry snapshot.
+ */
+static struct stasis_message *cache_udpate(struct cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
+{
+ struct stasis_message *old_snapshot;
+ int is_external;
+ int idx;
+
+ is_external = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_external) {
+ old_snapshot = cached_entry->internal;
+ cached_entry->internal = ao2_bump(new_snapshot);
+ return old_snapshot;
+ }
+
+ old_snapshot = NULL;
+ for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->external); ++idx) {
+ struct stasis_message *cur;
+
+ cur = AST_VECTOR_GET(&cached_entry->external, idx);
+ if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+ old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->external, idx);
+ break;
+ }
+ }
+ if (!AST_VECTOR_APPEND(&cached_entry->external, new_snapshot)) {
+ ao2_bump(new_snapshot);
+ }
+
+ return old_snapshot;
+}
+
static struct stasis_message *cache_put(struct stasis_cache *cache,
- struct stasis_message_type *type, const char *id,
+ struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
struct stasis_message *new_snapshot)
{
struct cache_entry_key search_key;
@@ -273,6 +386,7 @@
struct stasis_message *old_snapshot = NULL;
ast_assert(cache->entries != NULL);
+ ast_assert(eid != NULL);
ast_assert(new_snapshot == NULL ||
type == stasis_message_type(new_snapshot));
@@ -280,27 +394,21 @@
search_key.type = type;
search_key.id = id;
-
- if (new_snapshot == NULL) {
- /* Remove entry from cache */
- cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NOLOCK);
+ cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+ if (!new_snapshot) {
+ /* Remove snapshot from cache */
if (cached_entry) {
- old_snapshot = cached_entry->snapshot;
- cached_entry->snapshot = NULL;
- }
+ old_snapshot = cache_remove(cache->entries, cached_entry, eid);
+ }
+ } else if (cached_entry) {
+ /* Update snapshot in cache */
+ old_snapshot = cache_udpate(cached_entry, eid, new_snapshot);
} else {
- /* Insert/update cache */
- cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ /* Insert into the cache */
+ cached_entry = cache_entry_create(type, id, new_snapshot);
if (cached_entry) {
- /* Update cache. Because objects are moving, no need to update refcounts. */
- old_snapshot = cached_entry->snapshot;
- cached_entry->snapshot = ao2_bump(new_snapshot);
- } else {
- /* Insert into the cache */
- cached_entry = cache_entry_create(type, id, new_snapshot);
- if (cached_entry) {
- ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
- }
+ ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
}
}
@@ -310,25 +418,136 @@
return old_snapshot;
}
-struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
+/*!
+ * \internal
+ * \brief Dump all snapshots in the cache entry into the given container.
+ *
+ * \param snapshots Container to put all snapshots in the cache entry.
+ * \param entry Cache entry to use.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int cache_entry_dump(struct ao2_container *snapshots, const struct cache_entry *entry)
+{
+ int idx;
+ int err = 0;
+
+ ast_assert(snapshots != NULL);
+ ast_assert(entry != NULL);
+
+ if (entry->internal) {
+ err |= !ao2_link(snapshots, entry->internal);
+ }
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ struct stasis_message *snapshot;
+
+ snapshot = AST_VECTOR_GET(&entry->external, idx);
+ err |= !ao2_link(snapshots, snapshot);
+ }
+
+ return err;
+}
+
+struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
{
struct cache_entry_key search_key;
struct cache_entry *cached_entry;
- struct stasis_message *snapshot = NULL;
+ struct ao2_container *found;
ast_assert(cache != NULL);
ast_assert(cache->entries != NULL);
ast_assert(type != NULL);
ast_assert(id != NULL);
+ found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
+ if (!found) {
+ return NULL;
+ }
+
ao2_rdlock(cache->entries);
search_key.type = type;
search_key.id = id;
-
+ cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ if (cached_entry && cache_entry_dump(found, cached_entry)) {
+ ao2_cleanup(found);
+ found = NULL;
+ }
+
+ ao2_unlock(cache->entries);
+
+ ao2_cleanup(cached_entry);
+ return found;
+}
+
+/*!
+ * \internal
+ * \brief Retrieve an item from the cache entry for a specific eid.
+ *
+ * \param entry Cache entry to use.
+ * \param eid Specific entity id to retrieve. NULL if any entity, but prefer internal.
+ *
+ * \note The returned snapshot has not had its reference bumped.
+ *
+ * \retval Snapshot from the cache.
+ * \retval \c NULL if snapshot is not found.
+ */
+static struct stasis_message *cache_entry_by_eid(const struct cache_entry *entry, const struct ast_eid *eid)
+{
+ struct stasis_message *snapshot = NULL;
+
+ if (eid) {
+ int is_external;
+
+ /* Get snapshot with specific eid. */
+ is_external = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_external) {
+ snapshot = entry->internal;
+ } else {
+ int idx;
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ struct stasis_message *cur;
+
+ cur = AST_VECTOR_GET(&entry->external, idx);
+ if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+ snapshot = cur;
+ break;
+ }
+ }
+ }
+ } else {
+ /* Get any snapshot, but prefer internal. */
+ if (entry->internal) {
+ snapshot = entry->internal;
+ } else if (AST_VECTOR_SIZE(&entry->external)) {
+ snapshot = AST_VECTOR_GET(&entry->external, 0);
+ }
+ }
+
+ return snapshot;
+}
+
+struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
+{
+ struct cache_entry_key search_key;
+ struct cache_entry *cached_entry;
+ struct stasis_message *snapshot = NULL;
+
+ ast_assert(cache != NULL);
+ ast_assert(cache->entries != NULL);
+ ast_assert(type != NULL);
+ ast_assert(id != NULL);
+
+ ao2_rdlock(cache->entries);
+
+ search_key.type = type;
+ search_key.id = id;
cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (cached_entry) {
- snapshot = cached_entry->snapshot;
+ snapshot = cache_entry_by_eid(cached_entry, eid);
ao2_bump(snapshot);
}
@@ -336,11 +555,17 @@
ao2_cleanup(cached_entry);
return snapshot;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
+{
+ return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
}
struct cache_dump_data {
struct ao2_container *cached;
struct stasis_message_type *type;
+ const struct ast_eid *eid;
};
static int cache_dump_cb(void *obj, void *arg, int flags)
@@ -349,27 +574,42 @@
struct cache_entry *entry = obj;
if (!cache_dump->type || entry->key.type == cache_dump->type) {
- ao2_link(cache_dump->cached, entry->snapshot);
+ if (cache_dump->eid) {
+ struct stasis_message *snapshot;
+
+ snapshot = cache_entry_by_eid(entry, cache_dump->eid);
+ if (snapshot) {
+ ao2_link(cache_dump->cached, snapshot);
+ }
+ } else {
+ cache_entry_dump(cache_dump->cached, entry);
+ }
}
return 0;
}
-struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
+struct ao2_container *stasis_cache_dump_full(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
{
struct cache_dump_data cache_dump;
+ ast_assert(cache != NULL);
ast_assert(cache->entries != NULL);
+ cache_dump.eid = eid;
cache_dump.type = type;
- cache_dump.cached = ao2_container_alloc_options(
- AO2_ALLOC_OPT_LOCK_NOLOCK, 1, NULL, NULL);
+ cache_dump.cached = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
if (!cache_dump.cached) {
return NULL;
}
ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
return cache_dump.cached;
+}
+
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
+{
+ return stasis_cache_dump_full(cache, type, &ast_eid_default);
}
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
@@ -453,7 +693,8 @@
if (clear_id) {
struct stasis_message *old_snapshot;
- old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
+ old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id,
+ stasis_message_eid(clear_msg), NULL);
if (old_snapshot) {
struct stasis_message *update;
@@ -479,7 +720,8 @@
struct stasis_message *old_snapshot;
struct stasis_message *update;
- old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
+ old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id,
+ stasis_message_eid(message), message);
update = update_create(old_snapshot, message);
if (update) {
Modified: team/rmudgett/stasis_cache/main/stasis_message.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_message.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_message.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_message.c Fri Feb 21 22:18:53 2014
@@ -53,7 +53,7 @@
struct stasis_message_type *stasis_message_type_create(const char *name,
struct stasis_message_vtable *vtable)
{
- RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
+ struct stasis_message_type *type;
type = ao2_alloc(sizeof(*type), message_type_dtor);
if (!type) {
@@ -66,11 +66,11 @@
type->name = ast_strdup(name);
if (!type->name) {
+ ao2_cleanup(type);
return NULL;
}
type->vtable = vtable;
- ao2_ref(type, +1);
return type;
}
@@ -87,6 +87,8 @@
struct stasis_message_type *type;
/*! Message content */
void *data;
+ /*! Where this message originated. */
+ struct ast_eid eid;
};
static void stasis_message_dtor(void *obj)
@@ -96,11 +98,11 @@
ao2_cleanup(message->data);
}
-struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid)
{
- RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ struct stasis_message *message;
- if (type == NULL || data == NULL) {
+ if (type == NULL || data == NULL || eid == NULL) {
return NULL;
}
@@ -114,9 +116,22 @@
message->type = type;
ao2_ref(data, +1);
message->data = data;
+ message->eid = *eid;
- ao2_ref(message, +1);
return message;
+}
+
+struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data)
+{
+ return stasis_message_create_full(type, data, &ast_eid_default);
+}
+
+const struct ast_eid *stasis_message_eid(const struct stasis_message *msg)
+{
+ if (msg == NULL) {
+ return NULL;
+ }
+ return &msg->eid;
}
struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=408837&r1=408836&r2=408837
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Fri Feb 21 22:18:53 2014
@@ -401,6 +401,7 @@
ao2_callback(cache_dump, 0, remove_device_states_cb, NULL);
}
+/* BUGBUG this test currently fails because the device state is storing the information in a different place now. */
AST_TEST_DEFINE(device_state_aggregation_test)
{
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
More information about the asterisk-commits
mailing list