[asterisk-commits] dlee: branch dlee/stasis-cache-split r393969 - in /team/dlee/stasis-cache-spl...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Jul 10 08:51:51 CDT 2013


Author: dlee
Date: Wed Jul 10 08:51:49 2013
New Revision: 393969

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=393969
Log:
so much change

Modified:
    team/dlee/stasis-cache-split/include/asterisk/stasis.h
    team/dlee/stasis-cache-split/main/stasis_cache.c

Modified: team/dlee/stasis-cache-split/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/stasis.h?view=diff&rev=393969&r1=393968&r2=393969
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/stasis.h Wed Jul 10 08:51:49 2013
@@ -634,6 +634,8 @@
  * is updated, and a stasis_cache_update() message is forwarded, which has both
  * the original snapshot message and the new message.
  *
+ * The returned object is AO2 managed, so ao2_cleanup() when done with it.
+ *
  * \param original_topic Topic publishing snapshot messages.
  * \param cache Backend cache in which to keep snapshots.
  * \return New topic which changes snapshot messages to stasis_cache_update()
@@ -645,20 +647,22 @@
 	struct stasis_topic *original_topic, struct stasis_cache *cache);
 
 /*!
- * \brief Add a secondary cache to a caching topic.
+ * \brief Create a secondary cache off, indexed by a different field.
  *
  * The secondary cache should identify objects by a different field than the
  * primary cache. This is useful when snapshots may have multiple unique
  * identifiers (such as \ref ast_channel's uniqueid and name), and may be looked
  * up by either.
  *
- * \param caching_topic Topic to which to add the secondary cache.
- * \param cache Secondary cache to add to the topic.
- * \return 0 on success.
- * \return -1 on error.
- */
-int stasis_caching_topic_add_secondary_cache(
-	struct stasis_caching_topic *caching_topic, struct stasis_cache *cache);
+ * The returned object is AO2 managed, so ao2_cleanup() when done with it.
+ *
+ * \param primary_cache Primary cache.
+ * \param id_fn Identity function for the secondary cache.
+ * \return Secondary cache
+ * \return \c NULL on error
+ */
+struct stasis_cache *stasis_cache_create_secondary(
+	struct stasis_cache *primary_cache, snapshot_get_id id_fn);
 
 /*!
  * \brief Unsubscribes a caching topic from its upstream topic.
@@ -739,6 +743,43 @@
  */
 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache,
 	struct stasis_message_type *type);
+
+/*! @} */
+
+/*! @{
+ * \defgroup StasisCachePattern Stasis caching pattern.
+ *
+ * It's a common pattern to have:
+ *  1. Individual topics per-object
+ *  2. Corresponding individual caching topics
+ *  3. A shared cache for the caching topics
+ *  4. An aggregation topic collecting the individual topics
+ *  5. An aggregation topic collection the caching topics
+ *
+ * The shared objects (the cache and aggregation topics) are built with \ref
+ * stasis_cache_all. The individual objects are build with \ref
+ * stasis_cache_one. The various forwards and relationships between the objects
+ * are constructed automagically.
+ */
+
+/*! \brief Shared objects for the Stasis caching pattern. */
+struct stasis_cache_all;
+
+struct stasis_cache_all *stasis_cache_all_create(const char *base_name,
+	snapshot_get_id id_fn);
+
+struct stasis_topic *stasis_cache_all_topic(struct stasis_cache_all *all);
+
+struct stasis_topic *stasis_cache_all_topic_cached(struct stasis_cache_all *all);
+
+struct stasis_cache *stasis_cache_all_cache(struct stasis_cache_all *all);
+
+/*! \brief Per-instance objects for the Stasis caching pattern. */
+struct stasis_cache_one;
+
+struct stasis_topic *stasis_cache_one_topic(struct stasis_cache_one *one);
+
+struct stasis_topic *stasis_cache_one_topic_cached(struct stasis_cache_one *one);
 
 /*! @} */
 

Modified: team/dlee/stasis-cache-split/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_cache.c?view=diff&rev=393969&r1=393968&r2=393969
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_cache.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_cache.c Wed Jul 10 08:51:49 2013
@@ -47,6 +47,7 @@
 struct stasis_cache {
 	struct ao2_container *entries;
 	snapshot_get_id id_fn;
+	struct stasis_cache *secondary_cache;
 };
 
 /*! \internal */
@@ -182,6 +183,8 @@
 
 	ao2_cleanup(cache->entries);
 	cache->entries = NULL;
+	ao2_cleanup(cache->secondary_cache);
+	cache->secondary_cache = NULL;
 }
 
 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
@@ -205,6 +208,31 @@
 	return cache;
 }
 
+struct stasis_cache *stasis_cache_create_secondary(
+	struct stasis_cache *primary_cache, snapshot_get_id id_fn)
+{
+	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+	if (!primary_cache) {
+		return NULL;
+	}
+
+	cache = stasis_cache_create(id_fn);
+	if (!cache) {
+		return NULL;
+	}
+
+	{
+		SCOPED_AO2LOCK(lock, parent);
+		cache->secondary_cache = parent->secondary_cache;
+		ao2_ref(cache, +1);
+		parent->secondary_cache = cache;
+	}
+
+	ao2_ref(cache, +1);
+	return cache;
+}
+
 struct stasis_message *stasis_cache_get(struct stasis_cache *cache,
 	struct stasis_message_type *type, const char *id)
 {
@@ -227,24 +255,58 @@
 }
 
 static struct stasis_message *cache_put(struct stasis_cache *cache,
-	struct stasis_message_type *type, const char *id,
-	struct stasis_message *new_snapshot)
-{
-	RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
+	struct stasis_message *message);
+
+static void cache_put_secondary(struct stasis_cache *cache,
+	struct stasis_message *message)
+{
+	RAII_VAR(struct stasis_cache *, secondary_cache, NULL, ao2_cleanup);
+
+	{
+		SCOPED_AO2LOCK(lock, cache);
+		secondary_cache = cache->secondary_cache;
+		if (!secondary_cache) {
+			return;
+		}
+		ao2_ref(secondary_cache, +1);
+	}
+
+	cache_put(secondary_cache, message);
+}
+
+static struct stasis_message *cache_put(struct stasis_cache *cache,
+	struct stasis_message *message)
+{
 	RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
-	struct stasis_message *old_snapshot = NULL;
-
-	ast_assert(cache->entries != NULL);
-
-	new_entry = cache_entry_create(type, id, new_snapshot);
-
-	if (new_snapshot == NULL) {
-		/* Remove entry from cache */
-		cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK);
+	const char *id;
+
+	/* update secondary cache */
+	cache_put_secondary(cache, message);
+
+	/* Handle cache clear event */
+	if (stasis_cache_clear_type() == stasis_message_type(message)) {
+		RAII_VAR(struct cache_entry *, clear_entry, NULL, ao2_cleanup);
+		struct stasis_message *clear_snapshot =
+			stasis_message_data(message);
+		id = cache->id_fn(clear_snapshot);
+
+		clear_entry = cache_entry_create(type, id, clear_snapshot);
+
+		cached_entry = ao2_find(cache->entries, clear_entry, OBJ_POINTER | OBJ_UNLINK);
 		if (cached_entry) {
 			old_snapshot = cached_entry->snapshot;
 			cached_entry->snapshot = NULL;
 		}
+	}
+
+	if (!id) {
+		return NULL;
+	}
+
+	ast_assert(cache->entries != NULL);
+
+
+	if (is_clear) {
 	} else {
 		/* Insert/update cache */
 		SCOPED_AO2LOCK(lock, cache->entries);
@@ -259,8 +321,8 @@
 			/* Insert into the cache */
 			ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK);
 		}
-
-	}
+	}
+
 
 	return old_snapshot;
 }
@@ -369,39 +431,42 @@
 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, new_message, NULL, ao2_cleanup);
 	struct stasis_caching_topic *caching_topic = data;
 	const char *id = NULL;
 
 	ast_assert(caching_topic->topic != NULL);
-	ast_assert(caching_topic->cache->id_fn != NULL);
+	ast_assert(caching_topic->cache != NULL);
 
 	if (stasis_subscription_final_message(sub, message)) {
 		caching_topic_needs_unref = caching_topic;
 	}
 
-	/* Handle cache clear event */
+	new_message = cache_put(caching_topic->cache, message);
+	if (new_message) {
+		stasis_forward_message(caching_topic->topic, topic, message);
+
+	}
+
 	if (stasis_cache_clear_type() == stasis_message_type(message)) {
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 		struct stasis_message *clear_msg = stasis_message_data(message);
-		const char *clear_id = caching_topic->cache->id_fn(clear_msg);
 		struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
 
 		ast_assert(clear_type != NULL);
 
-		if (clear_id) {
-			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
-			if (old_snapshot) {
-				update = update_create(topic, old_snapshot, NULL);
-				stasis_publish(caching_topic->topic, update);
-				return;
-			}
-
-			ast_log(LOG_ERROR,
-				"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
-				stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
+		old_snapshot = cache_put(caching_topic->cache, clear_type, clear_msg, 1);
+		if (old_snapshot) {
+			update = update_create(topic, old_snapshot, NULL);
+			stasis_publish(caching_topic->topic, update);
 			return;
 		}
+
+		ast_log(LOG_ERROR,
+			"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+			stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
+		return;
 	}
 
 	id = caching_topic->cache->id_fn(message);
@@ -413,7 +478,7 @@
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 
-		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
+		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), message, );
 
 		update = update_create(topic, old_snapshot, message);
 		if (update == NULL) {




More information about the asterisk-commits mailing list