[svn-commits] kmoore: branch kmoore/stasis-mwi r382370 - in /team/kmoore/stasis-mwi: includ...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Mar 4 09:32:28 CST 2013


Author: kmoore
Date: Mon Mar  4 09:32:24 2013
New Revision: 382370

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382370
Log:
Add subscription change notifications and tests

This adds subscription change notifications as
stasis_subscription_change message type that are delivered inline with
other messages. This is important for services like voicemail's
pollmailboxes that need to know what is being subscribed to.

This also adds tests for the subscription change notices and tests for
the new cache dumping mechanism.

Modified:
    team/kmoore/stasis-mwi/include/asterisk/stasis.h
    team/kmoore/stasis-mwi/main/stasis.c
    team/kmoore/stasis-mwi/tests/test_stasis.c

Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382370&r1=382369&r2=382370
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Mon Mar  4 09:32:24 2013
@@ -343,6 +343,25 @@
  * \since 12
  */
 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*!
+ * \brief Holds details about changes to subscriptions for the specified topic
+ * \since 12
+ */
+struct stasis_subscription_change {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(uniqueid);	/*!< The unique ID associated with this subscription */
+		AST_STRING_FIELD(description);	/*!< The description of the change to the subscription associated with the uniqueid */
+	);
+	struct stasis_topic *topic;		/*!< The topic the subscription is/was subscribing to */
+};
+
+/*!
+ * \brief Gets the message type for subscription change notices
+ * \return The stasis_message_type for subscription change notices
+ * \since 12
+ */
+struct stasis_message_type *stasis_subscription_change(void);
 
 /*! @} */
 

Modified: team/kmoore/stasis-mwi/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis.c?view=diff&rev=382370&r1=382369&r2=382370
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis.c (original)
+++ team/kmoore/stasis-mwi/main/stasis.c Mon Mar  4 09:32:24 2013
@@ -43,6 +43,8 @@
 
 /*! Threadpool for dispatching notifications to subscribers */
 static struct ast_threadpool *pool;
+
+static struct stasis_message_type *__subscription_change_message_type;
 
 /*! \private */
 struct stasis_topic {
@@ -103,6 +105,8 @@
 
 /*! \private */
 struct stasis_subscription {
+	/*! Unique ID for this subscription */
+	char *id;
 	/*! Native pointer to the topic. AO2 ref could cause a cycle. */
 	struct stasis_topic *topic;
 	/*! Mailbox for processing incoming messages. */
@@ -123,6 +127,8 @@
 	sub->mailbox = NULL;
 }
 
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description, int topic_locked);
+
 static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
 {
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -147,6 +153,7 @@
 		}
 	}
 
+	sub->id = ast_strdup(stupid_name);
 	sub->topic = topic; /* Don't increase the refcount, or it will cause a cyclic ref! */
 	sub->callback = callback;
 	sub->data = data;
@@ -154,6 +161,7 @@
 	if (topic_add_subscription(topic, sub) != 0) {
 		return NULL;
 	}
+	send_subscription_change_message(topic, stupid_name, "Subscribe", 0);
 
 	ao2_ref(sub, +1);
 	return sub;
@@ -177,6 +185,7 @@
 
 			for (i = 0; i < topic->num_subscribers_current; ++i) {
 				if (topic->subscribers[i] == sub) {
+					send_subscription_change_message(topic, sub->id, "Unsubscribe", 1);
 					/* swap [i] with last entry; remove last entry */
 					topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
 					/* We can't clean up now, since the lock is held. defer to RAII */
@@ -297,13 +306,17 @@
 	return 0;
 }
 
-void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
-{
-	RAII_VAR(struct stasis_subscription **, subscribers, NULL, ast_free);
+static void __forward_message_full(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message, int topic_locked)
+{
+	struct stasis_subscription **subscribers;
 	size_t num_subscribers, i;
 
-	/* Copy the subscribers, so we don't have to hold the mutex for long */
-	{
+	if (topic_locked) {
+		/* topic is already locked, we don't control how long it is held */
+		num_subscribers = topic->num_subscribers_current;
+		subscribers = topic->subscribers;
+	} else {
+		/* Copy the subscribers, so we don't have to hold the mutex for long */
 		SCOPED_AO2LOCK(lock, topic);
 		num_subscribers = topic->num_subscribers_current;
 		subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
@@ -341,6 +354,15 @@
 			sub->callback(sub->data, sub->topic, message);
 		}
 	}
+
+	if (!topic_locked) {
+		ast_free(subscribers);
+	}
+}
+
+void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+	__forward_message_full(topic, publisher_topic, message, 0);
 }
 
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
@@ -362,6 +384,58 @@
 	}
 	/* Subscribe without a mailbox, since we're just forwarding messages */
 	return __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+}
+
+static void subscription_change_dtor(void *obj)
+{
+	struct stasis_subscription_change *change = obj;
+	ast_string_field_free_memory(change);
+	ao2_cleanup(change->topic);
+}
+
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+{
+	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+
+	change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
+	if (ast_string_field_init(change, 128)) {
+		return NULL;
+	}
+
+	ast_string_field_set(change, uniqueid, uniqueid);
+	ast_string_field_set(change, description, description);
+	ao2_ref(topic, +1);
+	change->topic = topic;
+
+	ao2_ref(change, +1);
+	return change;
+}
+
+struct stasis_message_type *stasis_subscription_change(void)
+{
+	return __subscription_change_message_type;
+}
+
+static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description, int topic_locked)
+{
+	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+	change = subscription_change_alloc(topic, uniqueid, description);
+
+	if (!change) {
+		return;
+	}
+
+	msg = stasis_message_create(stasis_subscription_change(), change);
+
+	if (!msg) {
+		return;
+	}
+
+	ao2_ref(change, +1);
+	ao2_ref(msg, +1);
+	__forward_message_full(topic, topic, msg, topic_locked);
 }
 
 /*! \brief Cleanup function */
@@ -402,5 +476,7 @@
 		return -1;
 	}
 
+	__subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
+
 	return 0;
 }

Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382370&r1=382369&r2=382370
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Mon Mar  4 09:32:24 2013
@@ -115,6 +115,14 @@
 	ast_cond_t out;
 	struct stasis_message **messages_rxed;
 	size_t messages_rxed_len;
+	size_t subs_rxed;
+	size_t unsubs_rxed;
+};
+
+enum consumer_rx_type {
+	MESSAGE,
+	SUBSCRIPTION,
+	UNSUBSCRIPTION,
 };
 
 static void consumer_dtor(void *obj) {
@@ -151,6 +159,18 @@
 	struct consumer *consumer = data;
 
 	SCOPED_MUTEX(lock, &consumer->lock);
+	if (stasis_message_type(message) == stasis_subscription_change()) {
+		struct stasis_subscription_change *change = stasis_message_data(message);
+		if (!strcmp("Subscribe", change->description)) {
+			++consumer->subs_rxed;
+		} else if (!strcmp("Unsubscribe", change->description)) {
+			++consumer->unsubs_rxed;
+		}
+
+		ast_cond_signal(&consumer->out);
+		return;
+	}
+
 	++consumer->messages_rxed_len;
 	consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
 	ast_assert(consumer->messages_rxed != NULL);
@@ -160,26 +180,40 @@
 	ast_cond_signal(&consumer->out);
 }
 
-static int consumer_wait_for(struct consumer *consumer, size_t expected_len) {
+static int consumer_wait_for(struct consumer *consumer, size_t expected_len, enum consumer_rx_type type) {
 	struct timeval start = ast_tvnow();
 	struct timespec end = {
 		.tv_sec = start.tv_sec + 3600,
 		.tv_nsec = start.tv_usec * 1000
 	};
+	size_t *wait_var;
 
 	SCOPED_MUTEX(lock, &consumer->lock);
-
-	while (consumer->messages_rxed_len < expected_len) {
+	switch (type) {
+	case MESSAGE:
+		wait_var = &consumer->messages_rxed_len;
+		break;
+	case SUBSCRIPTION:
+		wait_var = &consumer->subs_rxed;
+		break;
+	case UNSUBSCRIPTION:
+		wait_var = &consumer->unsubs_rxed;
+		break;
+	default:
+		return -1;
+	}
+
+	while (*wait_var < expected_len) {
 		int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
 		if (r == ETIMEDOUT) {
 			break;
 		}
 		ast_assert(r == 0); /* Not expecting any othet types of errors */
 	}
-	return consumer->messages_rxed_len;
-}
-
-static int consumer_should_stay(struct consumer *consumer, size_t expected_len) {
+	return *wait_var;
+}
+
+static int consumer_should_stay(struct consumer *consumer, size_t expected_len, enum consumer_rx_type type) {
 	struct timeval start = ast_tvnow();
 	struct timeval diff = {
 		.tv_sec = 0,
@@ -190,17 +224,31 @@
 		.tv_sec = end_tv.tv_sec,
 		.tv_nsec = end_tv.tv_usec * 1000
 	};
+	size_t *wait_var;
 
 	SCOPED_MUTEX(lock, &consumer->lock);
-
-	while (consumer->messages_rxed_len == expected_len) {
+	switch (type) {
+	case MESSAGE:
+		wait_var = &consumer->messages_rxed_len;
+		break;
+	case SUBSCRIPTION:
+		wait_var = &consumer->subs_rxed;
+		break;
+	case UNSUBSCRIPTION:
+		wait_var = &consumer->unsubs_rxed;
+		break;
+	default:
+		return -1;
+	}
+
+	while (*wait_var == expected_len) {
 		int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
 		if (r == ETIMEDOUT) {
 			break;
 		}
 		ast_assert(r == 0); /* Not expecting any othet types of errors */
 	}
-	return consumer->messages_rxed_len;
+	return *wait_var;
 }
 
 AST_TEST_DEFINE(subscribe)
@@ -225,7 +273,7 @@
 		break;
 	}
 
-	topic = stasis_topic_create("TestTopic");
+	topic = stasis_topic_create("SubscribeTopic");
 	ast_test_validate(test, NULL != topic);
 
 	consumer = consumer_create();
@@ -233,6 +281,9 @@
 
 	uut = stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != uut);
+
+	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->subs_rxed);
 
 	test_data = ao2_alloc(1, NULL);
 	ast_test_validate(test, NULL != test_data);
@@ -241,7 +292,7 @@
 
 	stasis_publish(topic, test_message);
 
-	actual_len = consumer_wait_for(consumer, 1);
+	actual_len = consumer_wait_for(consumer, 1, MESSAGE);
 	ast_test_validate(test, 1 == actual_len);
 	actual = stasis_message_data(consumer->messages_rxed[0]);
 	ast_test_validate(test, test_data == actual);
@@ -270,7 +321,7 @@
 		break;
 	}
 
-	topic = stasis_topic_create("TestTopic");
+	topic = stasis_topic_create("UnsubsribeTopic");
 	ast_test_validate(test, NULL != topic);
 
 	consumer = consumer_create();
@@ -279,7 +330,13 @@
 	uut = stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != uut);
 
+	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->subs_rxed);
+
 	stasis_unsubscribe(uut);
+
+	actual_len = consumer_wait_for(consumer, 1, UNSUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->unsubs_rxed);
 
 	test_data = ao2_alloc(1, NULL);
 	ast_test_validate(test, NULL != test_data);
@@ -288,7 +345,7 @@
 
 	stasis_publish(topic, test_message);
 
-	actual_len = consumer_should_stay(consumer, 0);
+	actual_len = consumer_should_stay(consumer, 0, MESSAGE);
 	ast_test_validate(test, 0 == actual_len);
 
 	return AST_TEST_PASS;
@@ -326,9 +383,9 @@
 		break;
 	}
 
-	parent_topic = stasis_topic_create("ParentTestTopic");
+	parent_topic = stasis_topic_create("ParentForwardTopic");
 	ast_test_validate(test, NULL != parent_topic);
-	topic = stasis_topic_create("TestTopic");
+	topic = stasis_topic_create("ForwardTopic");
 	ast_test_validate(test, NULL != topic);
 
 	forward_sub = stasis_forward_all(topic, parent_topic);
@@ -341,8 +398,18 @@
 
 	parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
 	ast_test_validate(test, NULL != parent_sub);
+
+	actual_len = consumer_wait_for(parent_consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == parent_consumer->subs_rxed);
+
 	sub = stasis_subscribe(topic, consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
+
+	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->subs_rxed);
+
+	actual_len = consumer_wait_for(parent_consumer, 2, SUBSCRIPTION);
+	ast_test_validate(test, 2 == parent_consumer->subs_rxed);
 
 	test_data = ao2_alloc(1, NULL);
 	ast_test_validate(test, NULL != test_data);
@@ -351,9 +418,9 @@
 
 	stasis_publish(topic, test_message);
 
-	actual_len = consumer_wait_for(consumer, 1);
+	actual_len = consumer_wait_for(consumer, 1, MESSAGE);
 	ast_test_validate(test, 1 == actual_len);
-	actual_len = consumer_wait_for(parent_consumer, 1);
+	actual_len = consumer_wait_for(parent_consumer, 1, MESSAGE);
 	ast_test_validate(test, 1 == actual_len);
 
 	return AST_TEST_PASS;
@@ -407,7 +474,9 @@
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
 	int actual_len;
 	struct stasis_message_type *actual_type;
@@ -425,21 +494,41 @@
 
 	non_cache_type = stasis_message_type_create("NonCacheable");
 	ast_test_validate(test, NULL != non_cache_type);
-	topic = stasis_topic_create("SomeTopic");
+
+	topic = stasis_topic_create("CachePassthroughTopic");
 	ast_test_validate(test, NULL != topic);
+	consumer_non_caching = consumer_create();
+	ast_test_validate(test, NULL != consumer_non_caching);
+	sub_non_caching = stasis_subscribe(topic, consumer_exec, consumer_non_caching);
+	ast_test_validate(test, NULL != sub_non_caching);
+
+	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
+
 	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
+
+	/* This isn't actually touched by anything, but should force steady-state
+	 * on the caching topic so the subscription below does not see the
+	 * subscription change notification generated by the creation of the
+	 * caching topic */
+	actual_len = consumer_should_stay(consumer, 0, SUBSCRIPTION);
+	ast_test_validate(test, 0 == consumer->subs_rxed);
+
 	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
 
+	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->subs_rxed);
+
 	test_message = cache_test_message_create(non_cache_type, "1", "1");
 	ast_test_validate(test, NULL != test_message);
 
 	stasis_publish(topic, test_message);
 
-	actual_len = consumer_wait_for(consumer, 1);
+	actual_len = consumer_wait_for(consumer, 1, MESSAGE);
 	ast_test_validate(test, 1 == actual_len);
 
 	actual_type = stasis_message_type(consumer->messages_rxed[0]);
@@ -456,7 +545,9 @@
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
@@ -464,6 +555,7 @@
 	int actual_len;
 	struct stasis_cache_update *actual_update;
 	struct stasis_message *actual_message;
+	struct ao2_container *cache_dump;
 
 	switch (cmd) {
 	case TEST_INIT:
@@ -478,14 +570,33 @@
 
 	cache_type = stasis_message_type_create("Cacheable");
 	ast_test_validate(test, NULL != cache_type);
-	topic = stasis_topic_create("SomeTopic");
+
+	topic = stasis_topic_create("CacheTopic");
 	ast_test_validate(test, NULL != topic);
+	consumer_non_caching = consumer_create();
+	ast_test_validate(test, NULL != consumer_non_caching);
+	sub_non_caching = stasis_subscribe(topic, consumer_exec, consumer_non_caching);
+	ast_test_validate(test, NULL != sub_non_caching);
+
+	actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
+
 	caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
 	ast_test_validate(test, NULL != caching_topic);
 	consumer = consumer_create();
 	ast_test_validate(test, NULL != consumer);
+
+	/* This isn't actually touched by anything, but should force steady-state
+	 * on the caching topic so the subscription below does not see the
+	 * subscription change notification generated by the creation of the
+	 * caching topic */
+	actual_len = consumer_should_stay(consumer, 0, SUBSCRIPTION);
+
 	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
 	ast_test_validate(test, NULL != sub);
+
+	actual_len = consumer_wait_for(consumer, 1, SUBSCRIPTION);
+	ast_test_validate(test, 1 == consumer->subs_rxed);
 
 	test_message1_1 = cache_test_message_create(cache_type, "1", "1");
 	ast_test_validate(test, NULL != test_message1_1);
@@ -495,8 +606,14 @@
 	/* Post a couple of snapshots */
 	stasis_publish(topic, test_message1_1);
 	stasis_publish(topic, test_message2_1);
-	actual_len = consumer_wait_for(consumer, 2);
+	actual_len = consumer_wait_for(consumer, 2, MESSAGE);
 	ast_test_validate(test, 2 == actual_len);
+
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
 
 	/* Check for new snapshot messages */
 	ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
@@ -519,7 +636,7 @@
 	ast_test_validate(test, NULL != test_message2_2);
 	stasis_publish(topic, test_message2_2);
 
-	actual_len = consumer_wait_for(consumer, 3);
+	actual_len = consumer_wait_for(consumer, 3, MESSAGE);
 	ast_test_validate(test, 3 == actual_len);
 
 	actual_update = stasis_message_data(consumer->messages_rxed[2]);
@@ -528,12 +645,18 @@
 	ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
 	ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2"));
 
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
+
 	/* Clear snapshot 1 */
 	test_message1_clear = stasis_cache_clear_create(cache_type, "1");
 	ast_test_validate(test, NULL != test_message1_clear);
 	stasis_publish(topic, test_message1_clear);
 
-	actual_len = consumer_wait_for(consumer, 4);
+	actual_len = consumer_wait_for(consumer, 4, MESSAGE);
 	ast_test_validate(test, 4 == actual_len);
 
 	actual_update = stasis_message_data(consumer->messages_rxed[3]);
@@ -541,6 +664,18 @@
 	ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
 	ast_test_validate(test, NULL == actual_update->new_snapshot);
 	ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
+
+	/* Dump the cache to ensure that it has the correct number of items in it */
+	cache_dump = stasis_cache_dump(caching_topic, NULL);
+	ast_test_validate(test, 1 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
+
+	/* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
+	cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change());
+	ast_test_validate(test, 0 == ao2_container_count(cache_dump));
+	ao2_ref(cache_dump, -1);
+	cache_dump = NULL;
 
 	return AST_TEST_PASS;
 }




More information about the svn-commits mailing list