[asterisk-commits] dlee: branch dlee/stasis-cache-split r393986 - /team/dlee/stasis-cache-split/...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Jul 10 11:24:10 CDT 2013


Author: dlee
Date: Wed Jul 10 11:24:08 2013
New Revision: 393986

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=393986
Log:
That's a bit better

Modified:
    team/dlee/stasis-cache-split/main/stasis_cache.c

Modified: team/dlee/stasis-cache-split/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_cache.c?view=diff&rev=393986&r1=393985&r2=393986
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_cache.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_cache.c Wed Jul 10 11:24:08 2013
@@ -56,7 +56,6 @@
 	struct stasis_topic *topic;
 	struct stasis_topic *original_topic;
 	struct stasis_subscription *sub;
-	snapshot_get_id id_fn;
 };
 
 static void stasis_caching_topic_dtor(void *obj) {
@@ -152,7 +151,7 @@
 	return entry;
 }
 
-__attribute__((unused)) static int cache_entry_hash(const void *obj, int flags)
+static int cache_entry_hash(const void *obj, int flags)
 {
 	const struct cache_entry *entry = obj;
 	int hash = 0;
@@ -164,7 +163,7 @@
 	return hash;
 }
 
-__attribute__((unused)) static int cache_entry_cmp(void *obj, void *arg, int flags)
+static int cache_entry_cmp(void *obj, void *arg, int flags)
 {
 	const struct cache_entry *left = obj;
 	const struct cache_entry *right = arg;
@@ -178,13 +177,73 @@
 	return 0;
 }
 
-static struct stasis_message *cache_put(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+static void cache_dtor(void *obj)
+{
+        struct stasis_cache *cache = obj;
+
+        ao2_cleanup(cache->entries);
+        cache->entries = NULL;
+        ao2_cleanup(cache->secondary_cache);
+        cache->secondary_cache = NULL;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+        RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+        cache = ao2_alloc(sizeof(*cache), cache_dtor);
+        if (!cache) {
+                return NULL;
+        }
+
+        cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash,
+                cache_entry_cmp);
+        if (!cache->entries) {
+                return NULL;
+        }
+
+        cache->id_fn = id_fn;
+
+        ao2_ref(cache, +1);
+        return cache;
+}
+
+struct stasis_cache *stasis_cache_create_secondary(
+        struct stasis_cache *primary_cache, snapshot_get_id id_fn)
+{
+        RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+        if (!primary_cache) {
+                return NULL;
+        }
+
+        cache = stasis_cache_create(id_fn);
+        if (!cache) {
+                return NULL;
+        }
+
+        {
+                SCOPED_AO2LOCK(lock, primary_cache);
+                cache->secondary_cache = primary_cache->secondary_cache;
+                ao2_ref(cache, +1);
+                primary_cache->secondary_cache = cache;
+        }
+
+        ao2_ref(cache, +1);
+        return cache;
+}
+
+static struct stasis_message *cache_put(struct stasis_cache *cache,
+	struct stasis_message_type *type, const char *id,
+	struct stasis_message *new_snapshot)
 {
 	RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
 	RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
 	struct stasis_message *old_snapshot = NULL;
 
 	ast_assert(cache->entries != NULL);
+	ast_assert(new_snapshot == NULL ||
+		type == stasis_message_type(new_snapshot));
 
 	new_entry = cache_entry_create(type, id, new_snapshot);
 
@@ -338,14 +397,16 @@
 	return msg;
 }
 
-static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static struct stasis_message *update_cache(
+	struct stasis_cache *cache, struct stasis_caching_topic *caching_topic,
+	struct stasis_subscription *sub, struct stasis_topic *topic,
+	struct stasis_message *message)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
-	struct stasis_caching_topic *caching_topic = data;
 	const char *id = NULL;
 
 	ast_assert(caching_topic->topic != NULL);
-	ast_assert(caching_topic->id_fn != NULL);
+	ast_assert(cache->id_fn != NULL);
 
 	if (stasis_subscription_final_message(sub, message)) {
 		caching_topic_needs_unref = caching_topic;
@@ -356,47 +417,78 @@
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 		struct stasis_message *clear_msg = stasis_message_data(message);
-		const char *clear_id = caching_topic->id_fn(clear_msg);
+		const char *clear_id = cache->id_fn(clear_msg);
 		struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
 
 		ast_assert(clear_type != NULL);
 
 		if (clear_id) {
-			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
+			old_snapshot = cache_put(cache, clear_type, clear_id, NULL);
 			if (old_snapshot) {
 				update = update_create(topic, old_snapshot, NULL);
-				stasis_publish(caching_topic->topic, update);
-				return;
+				ao2_ref(update, +1);
+				return update;
 			}
 
 			ast_log(LOG_ERROR,
 				"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
 				stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
-			return;
+			return NULL;
 		}
 	}
 
-	id = caching_topic->id_fn(message);
+	id = cache->id_fn(message);
 	if (id == NULL) {
 		/* Object isn't cached; forward */
-		stasis_forward_message(caching_topic->topic, topic, message);
+		ao2_ref(message, +1);
+		return message;
 	} else {
 		/* Update the cache */
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 
-		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
+		old_snapshot = cache_put(cache, stasis_message_type(message), id, message);
 
 		update = update_create(topic, old_snapshot, message);
 		if (update == NULL) {
-			return;
+			return NULL;
 		}
 
-		stasis_publish(caching_topic->topic, update);
+		ao2_ref(update, +1);
+		return update;
 	}
 
 	if (stasis_subscription_final_message(sub, message)) {
 		ao2_cleanup(caching_topic);
+	}
+}
+
+static void caching_topic_exec(void *data, struct stasis_subscription *sub,
+	struct stasis_topic *topic, struct stasis_message *message)
+{
+	struct stasis_caching_topic *caching_topic = data;
+	struct stasis_cache *cache = caching_topic->cache;
+	struct stasis_message *new_message;
+
+	new_message = update_cache(cache, caching_topic, sub, topic, message);
+	if (new_message) {
+		stasis_forward_message(caching_topic->topic, topic, new_message);
+		ao2_cleanup(new_message);
+		new_message = NULL;
+	}
+
+	/* Now update the secondary caches */
+	ao2_lock(cache);
+	cache = cache->secondary_cache;
+	ao2_unlock(cache);
+
+	while (cache) {
+		struct stasis_message *ignored =
+			update_cache(cache, caching_topic, sub, topic, message);
+		ao2_cleanup(ignored);
+		ao2_lock(cache);
+		cache = cache->secondary_cache;
+		ao2_unlock(cache);
 	}
 }
 




More information about the asterisk-commits mailing list