[asterisk-commits] dlee: branch dlee/stasis-http r382726 - in /team/dlee/stasis-http: ./ include...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 8 11:02:57 CST 2013


Author: dlee
Date: Fri Mar  8 11:02:53 2013
New Revision: 382726

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382726
Log:
Merged revisions 382725 from http://svn.asterisk.org/svn/asterisk/team/dlee/stasis-app

Modified:
    team/dlee/stasis-http/   (props changed)
    team/dlee/stasis-http/include/asterisk/stasis.h
    team/dlee/stasis-http/main/channel.c
    team/dlee/stasis-http/main/channel_internal_api.c
    team/dlee/stasis-http/main/stasis.c
    team/dlee/stasis-http/main/stasis_cache.c
    team/dlee/stasis-http/tests/test_stasis.c

Propchange: team/dlee/stasis-http/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Mar  8 11:02:53 2013
@@ -1,1 +1,1 @@
-/team/dlee/stasis-app:1-382700 /trunk:1-382685
+/team/dlee/stasis-app:1-382725 /trunk:1-382724

Modified: team/dlee/stasis-http/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/include/asterisk/stasis.h?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-http/include/asterisk/stasis.h Fri Mar  8 11:02:53 2013
@@ -35,9 +35,6 @@
  * within Asterisk. It is designed to be:
  *  - Loosely coupled; new message types can be added in seperate modules.
  *  - Easy to use; publishing and subscribing are straightforward operations.
- *  - Consistent memory management; all message bus objects are AO2 managed
- *    objects, using ao2_ref() and ao2_cleanup() to manage the reference
- *    counting.
  *
  * There are three main concepts for using the Stasis Message Bus:
  *  - \ref stasis_message
@@ -75,11 +72,12 @@
  * to the topic is dispatched to all of its subscribers. The topic itself may be
  * named, which is useful in debugging.
  *
- * Topics themselves are reference counted objects, and automagically
- * unsubscribe all of their subscribers when they are destroyed. Topics are also
- * thread safe, so no worries about publishing/subscribing/unsubscribing to a
- * topic concurrently from multiple threads. It's also designed to handle the
- * case of unsubscribing from a topic from within the subscription handler.
+ * Topics themselves are reference counted objects. Since topics are referred to
+ * by their subscibers, they will not be freed until all of their subscribers
+ * have unsubscribed. Topics are also thread safe, so no worries about
+ * publishing/subscribing/unsubscribing to a topic concurrently from multiple
+ * threads. It's also designed to handle the case of unsubscribing from a topic
+ * from within the subscription handler.
  *
  * \par Forwarding
  *
@@ -110,8 +108,10 @@
  * removed from the cache). A stasis_cache_clear_create() message must be sent
  * to the topic in order to remove entries from the cache.
  *
- * As with all things Stasis, the \ref stasis_caching_topic is a reference
- * counted AO2 object.
+ * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
+ * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
+ * stasis_caching_topic will not be freed until after it has been unsubscribed,
+ * and all other ao2_ref()'s have been cleaned up.
  *
  * \par stasis_subscriber
  *
@@ -122,12 +122,10 @@
  * threads (this usually isn't important unless you use thread locals or
  * something similar).
  *
- * Since the topic (by necessity) holds a reference to the subscription,
- * reference counting alone is insufficient to terminate a subscription. In
- * order to stop receiving messages, call stasis_unsubscribe() with your \ref
- * stasis_subscription. This will remove the topic's reference to the
- * subscription, and allow it to be destroyed when all of the other references
- * are cleaned up.
+ * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
+ * stasis_subscription. Due to cyclic references, the \ref
+ * stasis_subscription will not be freed until after it has been unsubscribed,
+ * and all other ao2_ref()'s have been cleaned up.
  */
 
 #include "asterisk/utils.h"
@@ -480,6 +478,17 @@
 					struct stasis_message_type *type,
 					const char *id);
 
+/*!
+ * \brief Dump cached items to a subscription
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param type Type of message to dump (any type if NULL).
+ * \return ao2_container containing all matches (must be unreffed by caller)
+ * \return NULL on allocation error
+ * \since 12
+ */
+struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
+					struct stasis_message_type *type);
+
 /*! @} */
 
 /*! @{ */

Modified: team/dlee/stasis-http/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/main/channel.c?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/main/channel.c (original)
+++ team/dlee/stasis-http/main/channel.c Fri Mar  8 11:02:53 2013
@@ -1282,6 +1282,8 @@
 #ifdef HAVE_EPOLL
 	ast_channel_epfd_set(tmp, -1);
 #endif
+
+	ast_channel_internal_setup_topics(tmp);
 
 	headp = ast_channel_varshead(tmp);
 	AST_LIST_HEAD_INIT_NOLOCK(headp);

Modified: team/dlee/stasis-http/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/main/channel_internal_api.c?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/main/channel_internal_api.c (original)
+++ team/dlee/stasis-http/main/channel_internal_api.c Fri Mar  8 11:02:53 2013
@@ -1391,8 +1391,14 @@
 
 void ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
+	const char *topic_name = chan->uniqueid;
 	ast_assert(chan->topic == NULL);
 	ast_assert(chan->forwarder == NULL);
-	chan->topic = stasis_topic_create(chan->uniqueid);
+
+	if (ast_strlen_zero(topic_name)) {
+		topic_name = "<dummy-channel>";
+	}
+
+	chan->topic = stasis_topic_create(topic_name);
 	chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
 }

Modified: team/dlee/stasis-http/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/main/stasis.c?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/main/stasis.c (original)
+++ team/dlee/stasis-http/main/stasis.c Fri Mar  8 11:02:53 2013
@@ -337,6 +337,10 @@
 	struct stasis_subscription **subscribers = NULL;
 	size_t num_subscribers, i;
 
+	ast_assert(topic != NULL);
+	ast_assert(publisher_topic != NULL);
+	ast_assert(message != NULL);
+
 	/* Copy the subscribers, so we don't have to hold the mutex for long */
 	{
 		SCOPED_AO2LOCK(lock, topic);

Modified: team/dlee/stasis-http/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/main/stasis_cache.c?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/main/stasis_cache.c (original)
+++ team/dlee/stasis-http/main/stasis_cache.c Fri Mar  8 11:02:53 2013
@@ -42,6 +42,7 @@
 #define NUM_CACHE_BUCKETS 563
 #endif
 
+/*! \private */
 struct stasis_caching_topic {
 	struct ao2_container *cache;
 	struct stasis_topic *topic;
@@ -203,6 +204,39 @@
 	ast_assert(cached_entry->snapshot != NULL);
 	ao2_ref(cached_entry->snapshot, +1);
 	return cached_entry->snapshot;
+}
+
+struct cache_dump_data {
+	struct ao2_container *cached;
+	struct stasis_message_type *type;
+};
+
+static int cache_dump_cb(void *obj, void *arg, int flags)
+{
+	struct cache_dump_data *cache_dump = arg;
+	struct cache_entry *entry = obj;
+
+	if (!cache_dump->type || entry->type == cache_dump->type) {
+		ao2_link(cache_dump->cached, entry->snapshot);
+	}
+
+	return 0;
+}
+
+struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
+{
+	struct cache_dump_data cache_dump;
+
+	ast_assert(caching_topic->cache != NULL);
+
+	cache_dump.type = type;
+	cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
+	if (!cache_dump.cached) {
+		return NULL;
+	}
+
+	ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
+	return cache_dump.cached;
 }
 
 static struct stasis_message_type *__cache_clear_data;

Modified: team/dlee/stasis-http/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/tests/test_stasis.c?view=diff&rev=382726&r1=382725&r2=382726
==============================================================================
--- team/dlee/stasis-http/tests/test_stasis.c (original)
+++ team/dlee/stasis-http/tests/test_stasis.c Fri Mar  8 11:02:53 2013
@@ -564,6 +564,7 @@
 	RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
 	int actual_len;
 	struct stasis_cache_update *actual_update;
+	struct ao2_container *cache_dump;
 
 	switch (cmd) {
 	case TEST_INIT:
@@ -599,6 +600,12 @@
 	actual_len = consumer_wait_for(consumer, 2);
 	ast_test_validate(test, 2 == actual_len);
 
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
+
 	/* Check for new snapshot messages */
 	ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
 	actual_update = stasis_message_data(consumer->messages_rxed[0]);
@@ -634,6 +641,12 @@
 	/* stasis_cache_get returned a ref, so unref test_message2_2 */
 	ao2_ref(test_message2_2, -1);
 
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
+
 	/* Clear snapshot 1 */
 	test_message1_clear = stasis_cache_clear_create(cache_type, "1");
 	ast_test_validate(test, NULL != test_message1_clear);
@@ -647,6 +660,18 @@
 	ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
 	ast_test_validate(test, NULL == actual_update->new_snapshot);
 	ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
+
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 1 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
+
+	/* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
+	cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change());
+	ast_test_validate(test, 0 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
 
 	return AST_TEST_PASS;
 }




More information about the asterisk-commits mailing list