[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r408874 - in /team/rmudgett/stasis_cac...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Feb 24 18:15:19 CST 2014
Author: rmudgett
Date: Mon Feb 24 18:15:17 2014
New Revision: 408874
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408874
Log:
stasis: Add support for aggregate cache and messages.
Modified:
team/rmudgett/stasis_cache/include/asterisk/stasis.h
team/rmudgett/stasis_cache/main/app.c
team/rmudgett/stasis_cache/main/stasis_cache.c
team/rmudgett/stasis_cache/main/stasis_message.c
Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Mon Feb 24 18:15:17 2014
@@ -307,7 +307,7 @@
*
* \param type Type of the message
* \param data Immutable data that is the actual contents of the message
- * \param eid What entity originated this message.
+ * \param eid What entity originated this message. (NULL for aggregate)
*
* \return New message
* \return \c NULL on error
@@ -319,8 +319,8 @@
*
* \param msg Message to get eid.
*
- * \return Entity id of \a msg
- * \return \c NULL if \a msg is \c NULL.
+ * \retval Entity id of \a msg
+ * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
*/
const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
@@ -668,6 +668,9 @@
*/
struct stasis_cache;
+/*! Cache entry used for calculating the aggregate snapshot. */
+struct stasis_cache_entry;
+
/*!
* \brief A topic wrapper, which caches certain messages.
* \since 12
@@ -689,6 +692,61 @@
typedef const char *(*snapshot_get_id)(struct stasis_message *message);
/*!
+ * \brief Callback to calculate the aggregate cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
+
+/*!
+ * \brief Get the aggregate cache entry snapshot.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the aggregate snapshot.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval Aggregate-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get the internal cache entry snapshot.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the internal snapshot.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval Internal-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get the external cache entry snapshot by index.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the external snapshot.
+ * \param idx Which external snapshot to get.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval External-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx);
+
+/*!
* \brief Create a cache.
*
* This is the backend store for a \ref stasis_caching_topic. The cache is
@@ -697,11 +755,31 @@
* The returned object is AO2 managed, so ao2_cleanup() when you're done.
*
* \param id_fn Callback to extract the id from a snapshot message.
- * \return New cache indexed by \a id_fn.
- * \return \c NULL on error
+ *
+ * \retval New cache indexed by \a id_fn.
+ * \retval \c NULL on error
+ *
* \since 12
*/
struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
+
+/*!
+ * \brief Create a cache.
+ *
+ * This is the backend store for a \ref stasis_caching_topic. The cache is
+ * thread safe, allowing concurrent reads and writes.
+ *
+ * The returned object is AO2 managed, so ao2_cleanup() when you're done.
+ *
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \param aggregate_fn Callback to calculate the aggregate cache entry.
+ *
+ * \retval New cache indexed by \a id_fn.
+ * \retval \c NULL on error
+ *
+ * \since 12.2.0
+ */
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn);
/*!
* \brief Create a topic which monitors and caches messages from another topic.
@@ -800,7 +878,7 @@
* \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
- * \param eid Specific entity id to retrieve. NULL if any entity, but prefer internal.
+ * \param eid Specific entity id to retrieve. NULL for aggregate.
*
* \retval Message from the cache.
* \retval \c NULL if message is not found.
Modified: team/rmudgett/stasis_cache/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/app.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/app.c (original)
+++ team/rmudgett/stasis_cache/main/app.c Mon Feb 24 18:15:17 2014
@@ -2822,6 +2822,7 @@
}
/* BUGBUG mwi_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
+/* BUGBUG mwi_state needs to be treated just like device state. It has an aggregate for multiple eid sources. */
int ast_publish_mwi_state_full(
const char *mailbox,
const char *context,
Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Mon Feb 24 18:15:17 2014
@@ -48,6 +48,7 @@
struct stasis_cache {
struct ao2_container *entries;
snapshot_get_id id_fn;
+ cache_aggregate_calc_fn aggregate_fn;
};
/*! \internal */
@@ -130,8 +131,10 @@
const char *id;
};
-struct cache_entry {
+struct stasis_cache_entry {
struct cache_entry_key key;
+ /*! Aggregate snapshot of the stasis cache. */
+ struct stasis_message *aggregate;
/*! Internal snapshot of the stasis event. */
struct stasis_message *internal;
/*! External snapshots of the stasis event. */
@@ -140,7 +143,7 @@
static void cache_entry_dtor(void *obj)
{
- struct cache_entry *entry = obj;
+ struct stasis_cache_entry *entry = obj;
size_t idx;
ao2_cleanup(entry->key.type);
@@ -148,6 +151,8 @@
ast_free((char *) entry->key.id);
entry->key.id = NULL;
+ ao2_cleanup(entry->aggregate);
+ entry->aggregate = NULL;
ao2_cleanup(entry->internal);
entry->internal = NULL;
@@ -160,9 +165,9 @@
AST_VECTOR_FREE(&entry->external);
}
-static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
-{
- struct cache_entry *entry;
+static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+ struct stasis_cache_entry *entry;
int is_external;
ast_assert(type != NULL);
@@ -203,7 +208,7 @@
static int cache_entry_hash(const void *obj, int flags)
{
- const struct cache_entry *object;
+ const struct stasis_cache_entry *object;
const struct cache_entry_key *key;
int hash = 0;
@@ -228,8 +233,8 @@
static int cache_entry_cmp(void *obj, void *arg, int flags)
{
- const struct cache_entry *object_left = obj;
- const struct cache_entry *object_right = arg;
+ const struct stasis_cache_entry *object_left = obj;
+ const struct stasis_cache_entry *object_right = arg;
const struct cache_entry_key *right_key = obj;
int cmp;
@@ -272,7 +277,7 @@
cache->entries = NULL;
}
-struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn)
{
struct stasis_cache *cache;
@@ -290,8 +295,54 @@
}
cache->id_fn = id_fn;
+ cache->aggregate_fn = aggregate_fn;
return cache;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+ return stasis_cache_create_full(id_fn, NULL);
+}
+
+struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
+{
+ return entry->aggregate;
+}
+
+struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry)
+{
+ return entry->internal;
+}
+
+struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx)
+{
+ if (idx < AST_VECTOR_SIZE(&entry->external)) {
+ return AST_VECTOR_GET(&entry->external, idx);
+ }
+ return NULL;
+}
+
+/*!
+ * \internal
+ * \brief Find the cache entry in the cache entries container.
+ *
+ * \param entries Container of cached entries.
+ * \param type Type of message to retrieve the cache entry.
+ * \param id Identity of the snapshot to retrieve the cache entry.
+ *
+ * \note The entries container is already locked.
+ *
+ * \retval Cache-entry on success.
+ * \retval NULL Not in cache.
+ */
+static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
+{
+ struct cache_entry_key search_key;
+
+ search_key.type = type;
+ search_key.id = id;
+ return ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
}
/*!
@@ -306,7 +357,7 @@
*
* \return Previous stasis entry snapshot.
*/
-static struct stasis_message *cache_remove(struct ao2_container *entries, struct cache_entry *cached_entry, const struct ast_eid *eid)
+static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
{
struct stasis_message *old_snapshot;
int is_external;
@@ -347,7 +398,7 @@
*
* \return Previous stasis entry snapshot.
*/
-static struct stasis_message *cache_udpate(struct cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
+static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
{
struct stasis_message *old_snapshot;
int is_external;
@@ -377,33 +428,42 @@
return old_snapshot;
}
-static struct stasis_message *cache_put(struct stasis_cache *cache,
+struct cache_put_snapshots {
+ /*! Old cache eid snapshot. */
+ struct stasis_message *old;
+ /*! Old cache aggregate snapshot. */
+ struct stasis_message *aggregate_old;
+ /*! New cache aggregate snapshot. */
+ struct stasis_message *aggregate_new;
+};
+
+static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
struct stasis_message *new_snapshot)
{
- struct cache_entry_key search_key;
- struct cache_entry *cached_entry;
- struct stasis_message *old_snapshot = NULL;
+ struct stasis_cache_entry *cached_entry;
+ struct cache_put_snapshots snapshots;
ast_assert(cache->entries != NULL);
- ast_assert(eid != NULL);
+ ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
ast_assert(new_snapshot == NULL ||
type == stasis_message_type(new_snapshot));
+ memset(&snapshots, 0, sizeof(snapshots));
+
ao2_wrlock(cache->entries);
- search_key.type = type;
- search_key.id = id;
- cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
+ cached_entry = cache_find(cache->entries, type, id);
+
+ /* Update the eid snapshot. */
if (!new_snapshot) {
/* Remove snapshot from cache */
if (cached_entry) {
- old_snapshot = cache_remove(cache->entries, cached_entry, eid);
+ snapshots.old = cache_remove(cache->entries, cached_entry, eid);
}
} else if (cached_entry) {
/* Update snapshot in cache */
- old_snapshot = cache_udpate(cached_entry, eid, new_snapshot);
+ snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
} else {
/* Insert into the cache */
cached_entry = cache_entry_create(type, id, new_snapshot);
@@ -412,15 +472,22 @@
}
}
+ /* Update the aggregate snapshot. */
+ if (cache->aggregate_fn && cached_entry) {
+ snapshots.aggregate_new = cache->aggregate_fn(cached_entry, new_snapshot);
+ snapshots.aggregate_old = cached_entry->aggregate;
+ cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
+ }
+
ao2_unlock(cache->entries);
ao2_cleanup(cached_entry);
- return old_snapshot;
+ return snapshots;
}
/*!
* \internal
- * \brief Dump all snapshots in the cache entry into the given container.
+ * \brief Dump all entity snapshots in the cache entry into the given container.
*
* \param snapshots Container to put all snapshots in the cache entry.
* \param entry Cache entry to use.
@@ -428,7 +495,7 @@
* \retval 0 on success.
* \retval -1 on error.
*/
-static int cache_entry_dump(struct ao2_container *snapshots, const struct cache_entry *entry)
+static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
{
int idx;
int err = 0;
@@ -436,6 +503,8 @@
ast_assert(snapshots != NULL);
ast_assert(entry != NULL);
+ /* The aggregate snapshot is not a snapshot from an entity. */
+
if (entry->internal) {
err |= !ao2_link(snapshots, entry->internal);
}
@@ -452,8 +521,7 @@
struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
{
- struct cache_entry_key search_key;
- struct cache_entry *cached_entry;
+ struct stasis_cache_entry *cached_entry;
struct ao2_container *found;
ast_assert(cache != NULL);
@@ -468,9 +536,7 @@
ao2_rdlock(cache->entries);
- search_key.type = type;
- search_key.id = id;
- cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ cached_entry = cache_find(cache->entries, type, id);
if (cached_entry && cache_entry_dump(found, cached_entry)) {
ao2_cleanup(found);
found = NULL;
@@ -487,53 +553,44 @@
* \brief Retrieve an item from the cache entry for a specific eid.
*
* \param entry Cache entry to use.
- * \param eid Specific entity id to retrieve. NULL if any entity, but prefer internal.
+ * \param eid Specific entity id to retrieve. NULL for aggregate.
*
* \note The returned snapshot has not had its reference bumped.
*
* \retval Snapshot from the cache.
* \retval \c NULL if snapshot is not found.
*/
-static struct stasis_message *cache_entry_by_eid(const struct cache_entry *entry, const struct ast_eid *eid)
-{
- struct stasis_message *snapshot = NULL;
-
- if (eid) {
- int is_external;
-
- /* Get snapshot with specific eid. */
- is_external = ast_eid_cmp(eid, &ast_eid_default);
- if (!is_external) {
- snapshot = entry->internal;
- } else {
- int idx;
-
- for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
- struct stasis_message *cur;
-
- cur = AST_VECTOR_GET(&entry->external, idx);
- if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
- snapshot = cur;
- break;
- }
- }
- }
- } else {
- /* Get any snapshot, but prefer internal. */
- if (entry->internal) {
- snapshot = entry->internal;
- } else if (AST_VECTOR_SIZE(&entry->external)) {
- snapshot = AST_VECTOR_GET(&entry->external, 0);
- }
- }
-
- return snapshot;
+static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
+{
+ int is_external;
+ int idx;
+
+ if (!eid) {
+ /* Get aggregate. */
+ return entry->aggregate;
+ }
+
+ /* Get snapshot with specific eid. */
+ is_external = ast_eid_cmp(eid, &ast_eid_default);
+ if (!is_external) {
+ return entry->internal;
+ }
+
+ for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+ struct stasis_message *cur;
+
+ cur = AST_VECTOR_GET(&entry->external, idx);
+ if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+ return cur;
+ }
+ }
+
+ return NULL;
}
struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
{
- struct cache_entry_key search_key;
- struct cache_entry *cached_entry;
+ struct stasis_cache_entry *cached_entry;
struct stasis_message *snapshot = NULL;
ast_assert(cache != NULL);
@@ -543,9 +600,7 @@
ao2_rdlock(cache->entries);
- search_key.type = type;
- search_key.id = id;
- cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+ cached_entry = cache_find(cache->entries, type, id);
if (cached_entry) {
snapshot = cache_entry_by_eid(cached_entry, eid);
ao2_bump(snapshot);
@@ -571,7 +626,7 @@
static int cache_dump_cb(void *obj, void *arg, int flags)
{
struct cache_dump_data *cache_dump = arg;
- struct cache_entry *entry = obj;
+ struct stasis_cache_entry *entry = obj;
if (!cache_dump->type || entry->key.type == cache_dump->type) {
if (cache_dump->eid) {
@@ -669,9 +724,11 @@
static void caching_topic_exec(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
- RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+ struct stasis_caching_topic *caching_topic_needs_unref;
struct stasis_caching_topic *caching_topic = data;
- const char *id = NULL;
+ const char *id;
+ struct stasis_message *update;
+ struct cache_put_snapshots snapshots;
ast_assert(caching_topic != NULL);
ast_assert(caching_topic->topic != NULL);
@@ -680,6 +737,8 @@
if (stasis_subscription_final_message(sub, message)) {
caching_topic_needs_unref = caching_topic;
+ } else {
+ caching_topic_needs_unref = NULL;
}
/* Handle cache clear event */
@@ -691,45 +750,61 @@
ast_assert(clear_type != NULL);
if (clear_id) {
- struct stasis_message *old_snapshot;
-
- old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id,
+ snapshots = cache_put(caching_topic->cache, clear_type, clear_id,
stasis_message_eid(clear_msg), NULL);
- if (old_snapshot) {
- struct stasis_message *update;
-
- update = update_create(old_snapshot, NULL);
+ if (snapshots.old) {
+ update = update_create(snapshots.old, NULL);
if (update) {
stasis_publish(caching_topic->topic, update);
}
ao2_cleanup(update);
- ao2_cleanup(old_snapshot);
- return;
+ } else {
+ ast_log(LOG_ERROR,
+ "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+ stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
}
- ast_log(LOG_ERROR,
- "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
- stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
- }
+ if (snapshots.aggregate_old != snapshots.aggregate_new) {
+ update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+ if (update) {
+ stasis_publish(caching_topic->topic, update);
+ }
+ ao2_cleanup(update);
+ }
+
+ ao2_cleanup(snapshots.old);
+ ao2_cleanup(snapshots.aggregate_old);
+ ao2_cleanup(snapshots.aggregate_new);
+ }
+ ao2_cleanup(caching_topic_needs_unref);
return;
}
id = caching_topic->cache->id_fn(message);
if (id) {
/* Update the cache */
- struct stasis_message *old_snapshot;
- struct stasis_message *update;
-
- old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id,
+ snapshots = cache_put(caching_topic->cache, stasis_message_type(message), id,
stasis_message_eid(message), message);
- update = update_create(old_snapshot, message);
+ update = update_create(snapshots.old, message);
if (update) {
stasis_publish(caching_topic->topic, update);
}
ao2_cleanup(update);
- ao2_cleanup(old_snapshot);
- }
+
+ if (snapshots.aggregate_old != snapshots.aggregate_new) {
+ update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+ if (update) {
+ stasis_publish(caching_topic->topic, update);
+ }
+ ao2_cleanup(update);
+ }
+
+ ao2_cleanup(snapshots.old);
+ ao2_cleanup(snapshots.aggregate_old);
+ ao2_cleanup(snapshots.aggregate_new);
+ }
+ ao2_cleanup(caching_topic_needs_unref);
}
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
Modified: team/rmudgett/stasis_cache/main/stasis_message.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_message.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_message.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_message.c Mon Feb 24 18:15:17 2014
@@ -85,6 +85,8 @@
struct timeval timestamp;
/*! Type of the message */
struct stasis_message_type *type;
+ /*! Where this message originated. NULL if aggregate message. */
+ const struct ast_eid *eid_ptr;
/*! Message content */
void *data;
/*! Where this message originated. */
@@ -102,7 +104,7 @@
{
struct stasis_message *message;
- if (type == NULL || data == NULL || eid == NULL) {
+ if (type == NULL || data == NULL) {
return NULL;
}
@@ -116,7 +118,10 @@
message->type = type;
ao2_ref(data, +1);
message->data = data;
- message->eid = *eid;
+ if (eid) {
+ message->eid_ptr = &message->eid;
+ message->eid = *eid;
+ }
return message;
}
@@ -131,7 +136,7 @@
if (msg == NULL) {
return NULL;
}
- return &msg->eid;
+ return msg->eid_ptr;
}
struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)
More information about the asterisk-commits
mailing list