[asterisk-commits] dlee: branch dlee/endpoints r387475 - in /team/dlee/endpoints: include/asteri...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu May 2 13:40:03 CDT 2013


Author: dlee
Date: Thu May  2 13:40:01 2013
New Revision: 387475

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=387475
Log:
Unit tests pass

Modified:
    team/dlee/endpoints/include/asterisk/stasis_endpoints.h
    team/dlee/endpoints/include/asterisk/stasis_test.h
    team/dlee/endpoints/main/endpoints.c
    team/dlee/endpoints/main/stasis_cache.c
    team/dlee/endpoints/main/stasis_endpoints.c
    team/dlee/endpoints/res/res_stasis_test.c
    team/dlee/endpoints/tests/test_endpoints.c
    team/dlee/endpoints/tests/test_stasis_endpoints.c

Modified: team/dlee/endpoints/include/asterisk/stasis_endpoints.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/include/asterisk/stasis_endpoints.h?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/include/asterisk/stasis_endpoints.h (original)
+++ team/dlee/endpoints/include/asterisk/stasis_endpoints.h Thu May  2 13:40:01 2013
@@ -60,7 +60,7 @@
 	 */
 	int max_channels;
 	/*! Number of channels currently active on this endpoint */
-	int current_channels;
+	int num_channels;
 	/*! Channel ids */
 	char *channel_ids[];
 };

Modified: team/dlee/endpoints/include/asterisk/stasis_test.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/include/asterisk/stasis_test.h?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/include/asterisk/stasis_test.h (original)
+++ team/dlee/endpoints/include/asterisk/stasis_test.h Thu May  2 13:40:01 2013
@@ -28,7 +28,7 @@
 #include "asterisk/lock.h"
 #include "asterisk/stasis.h"
 
-#define DEFAULT_WAIT_MILLIS 5000
+#define STASIS_SINK_DEFAULT_WAIT 5000
 
 /*! \brief Structure that collects messages from a topic */
 struct stasis_message_sink {
@@ -58,10 +58,15 @@
 
 /*!
  * \brief Topic callback to receive messages.
- * See \ref stasis_subscription_cb
+ *
+ * We return a function pointer instead of simply exposing the function because
+ * of the vagaries of dlopen(), \c RTLD_LAZY, and function pointers. See the
+ * comment on the implementation for details why.
+ *
+ * \return Function pointer to \ref stasis_message_sink's message handling
+ *         function
  */
-void stasis_message_sink_cb(void *data, struct stasis_subscription *sub,
-	struct stasis_topic *topic, struct stasis_message *message);
+stasis_subscription_cb stasis_message_sink_cb(void);
 
 /*!
  * \brief Wait for a sink's num_messages field to reach a certain level.
@@ -74,8 +79,24 @@
  * \return Actual sink->num_messages value at return.
  *         If this is < \a num_messages, then the timeout expired.
  */
-int stasis_message_sink_wait_for(struct stasis_message_sink *sink,
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
 	int num_messages, int timeout_millis);
+
+typedef int (*stasis_wait_cb)(struct stasis_message *msg, const void *data);
+
+/*!
+ * \brief Wait for a message that matches the given criteria.
+ *
+ * \param sink Sink to wait on.
+ * \param start Index of message to start with.
+ * \param cmp_cb comparison function. This returns true (non-zero) on match
+ *               and false (zero) on match.
+ * \param timeout_millis Number of milliseconds to wait.
+ * \return Index of the matching message.
+ * \return Negative for no match.
+ */
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+	stasis_wait_cb cmp_cb, const void *data, int timeout_millis);
 
 /*!
  * \brief Ensures that no new messages are received.

Modified: team/dlee/endpoints/main/endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/endpoints.c (original)
+++ team/dlee/endpoints/main/endpoints.c Thu May  2 13:40:01 2013
@@ -119,6 +119,7 @@
 	struct ast_endpoint *endpoint = data;
 	struct ast_channel_snapshot *snapshot = stasis_message_data(message);
 	RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
+	int publish = 0;
 
 	ast_assert(endpoint != NULL);
 	ast_assert(snapshot != NULL);
@@ -129,9 +130,12 @@
 	if (!existing_id) {
 		ast_str_container_add(endpoint->channel_ids,
 			snapshot->uniqueid);
+		publish = 1;
 	}
 	ao2_unlock(endpoint);
-	endpoint_publish_snapshot(endpoint);
+	if (publish) {
+		endpoint_publish_snapshot(endpoint);
+	}
 }
 
 static void endpoint_cache_clear(void *data,
@@ -322,8 +326,7 @@
 	i = ao2_iterator_init(endpoint->channel_ids, 0);
 	while ((obj = ao2_iterator_next(&i))) {
 		RAII_VAR(char *, channel_id, obj, ao2_cleanup);
-		snapshot->channel_ids[snapshot->current_channels++] =
-			channel_id;
+		snapshot->channel_ids[snapshot->num_channels++] = channel_id;
 	}
 
 	ao2_ref(snapshot, +1);

Modified: team/dlee/endpoints/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/stasis_cache.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/stasis_cache.c (original)
+++ team/dlee/endpoints/main/stasis_cache.c Thu May  2 13:40:01 2013
@@ -365,7 +365,9 @@
 			update = update_create(topic, old_snapshot, NULL);
 			stasis_publish(caching_topic->topic, update);
 		} else {
-			ast_log(LOG_ERROR,
+			/* While this could be a problem, it's very likely to
+			 * happen with message forwarding */
+			ast_log(LOG_DEBUG,
 				"Attempting to remove an item from the cache that isn't there: %s %s\n",
 				stasis_message_type_name(clear->type), clear->id);
 		}

Modified: team/dlee/endpoints/main/stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/stasis_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/stasis_endpoints.c (original)
+++ team/dlee/endpoints/main/stasis_endpoints.c Thu May  2 13:40:01 2013
@@ -143,7 +143,7 @@
 
 	channel_array = ast_json_object_get(json, "channels");
 	ast_assert(channel_array != NULL);
-	for (i = 0; i < snapshot->current_channels; ++i) {
+	for (i = 0; i < snapshot->num_channels; ++i) {
 		v = ast_json_array_append(channel_array,
 			ast_json_stringf("channel:%s",
 				snapshot->channel_ids[i]));
@@ -179,7 +179,7 @@
 
 	if (!endpoint_snapshot_type) {
 		endpoint_snapshot_type = stasis_message_type_create(
-			"ast_endpoint_snapshot_type");
+			"ast_endpoint_snapshot");
 	}
 
 	if (!endpoint_snapshot_type) {

Modified: team/dlee/endpoints/res/res_stasis_test.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/res/res_stasis_test.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/res/res_stasis_test.c (original)
+++ team/dlee/endpoints/res/res_stasis_test.c Thu May  2 13:40:01 2013
@@ -97,7 +97,28 @@
 	return sink;
 }
 
-void stasis_message_sink_cb(void *data, struct stasis_subscription *sub,
+/*!
+ * \brief Implementation of the stasis_message_sink_cb() callback.
+ *
+ * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
+ * it has to do with how we load modules.
+ *
+ * Modules have their own metadata compiled into them in the AST_MODULE_INFO()
+ * block. This includes dependency information in the \c nonoptreq field.
+ *
+ * Asterisk loads the module, inspects the field, then loads any needed
+ * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
+ * dlopen(), which defers binding function references until they are called.
+ *
+ * But when you take the address of a function, that function needs to be
+ * available at load time. So if some module used the address of
+ * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
+ * that module would fail to load.
+ *
+ * The stasis_message_sink_cb() function gives us a layer of indirection so that
+ * the initial lazy binding will still work as expected.
+ */
+static void message_sink_cb(void *data, struct stasis_subscription *sub,
 	struct stasis_topic *topic, struct stasis_message *message)
 {
 	struct stasis_message_sink *sink = data;
@@ -132,7 +153,13 @@
 	ast_cond_signal(&sink->cond);
 }
 
-int stasis_message_sink_wait_for(struct stasis_message_sink *sink,
+stasis_subscription_cb stasis_message_sink_cb(void)
+{
+	return message_sink_cb;
+}
+
+
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
 	int num_messages, int timeout_millis)
 {
 	struct timespec deadline = make_deadline(timeout_millis);
@@ -143,7 +170,8 @@
 
 		if (r == ETIMEDOUT) {
 			break;
-		} else if (r != 0) {
+		}
+		if (r != 0) {
 			ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
 				strerror(r));
 			break;
@@ -163,13 +191,59 @@
 
 		if (r == ETIMEDOUT) {
 			break;
-		} else {
+		}
+		if (r != 0) {
 			ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
 				strerror(r));
 			break;
 		}
 	}
 	return sink->num_messages;
+}
+
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+	stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
+{
+	struct timespec deadline = make_deadline(timeout_millis);
+
+	SCOPED_MUTEX(lock, &sink->lock);
+
+	/* wait for the start */
+	while (sink->num_messages < start + 1) {
+		int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+		if (r == ETIMEDOUT) {
+			/* Timed out waiting for the start */
+			return -1;
+		}
+		if (r != 0) {
+			ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+				strerror(r));
+			return -2;
+		}
+	}
+
+
+	while (!cmp_cb(sink->messages[start], data)) {
+		++start;
+
+		while (sink->num_messages < start + 1) {
+			int r = ast_cond_timedwait(&sink->cond,
+				&sink->lock, &deadline);
+
+			if (r == ETIMEDOUT) {
+				return -1;
+			}
+			if (r != 0) {
+				ast_log(LOG_ERROR,
+					"Unexpected condition error: %s\n",
+					strerror(r));
+				return -2;
+			}
+		}
+	}
+
+	return start;
 }
 
 struct stasis_message *stasis_test_message_create(void)

Modified: team/dlee/endpoints/tests/test_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/tests/test_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/tests/test_endpoints.c (original)
+++ team/dlee/endpoints/tests/test_endpoints.c Thu May  2 13:40:01 2013
@@ -99,7 +99,7 @@
 	ast_test_validate(test, 0 == strcmp("test_res", snapshot->resource));
 	ast_test_validate(test, AST_ENDPOINT_UNKNOWN == snapshot->state);
 	ast_test_validate(test, -1 == snapshot->max_channels);
-	ast_test_validate(test, 0 == snapshot->current_channels);
+	ast_test_validate(test, 0 == snapshot->num_channels);
 
 	return AST_TEST_PASS;
 }

Modified: team/dlee/endpoints/tests/test_stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/tests/test_stasis_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/tests/test_stasis_endpoints.c (original)
+++ team/dlee/endpoints/tests/test_stasis_endpoints.c Thu May  2 13:40:01 2013
@@ -38,28 +38,54 @@
 #include "asterisk/channel.h"
 #include "asterisk/endpoints.h"
 #include "asterisk/module.h"
+#include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stasis_test.h"
 #include "asterisk/test.h"
 
 static const char *test_category = "/stasis/endpoints/";
 
-static void safe_channel_release(struct ast_channel *chan)
+static void safe_channel_hangup(struct ast_channel *chan)
 {
 	if (!chan) {
 		return;
 	}
-	ast_channel_release(chan);
-}
-
-AST_TEST_DEFINE(channel_messages)
+	ast_hangup(chan);
+}
+
+static int cache_update(struct stasis_message *msg, const void *data) {
+	struct stasis_cache_update *update;
+	struct ast_endpoint_snapshot *snapshot;
+	const char *name = data;
+
+	if (stasis_cache_update_type() != stasis_message_type(msg)) {
+		return 0;
+	}
+
+	update = stasis_message_data(msg);
+	if (ast_endpoint_snapshot_type() != update->type) {
+		return 0;
+	}
+
+	snapshot = stasis_message_data(update->old_snapshot);
+	if (!snapshot) {
+		snapshot = stasis_message_data(update->new_snapshot);
+	}
+
+	return 0 == strcmp(name, snapshot->resource);
+}
+
+AST_TEST_DEFINE(state_changes)
 {
 	RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
-	RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_release);
-	RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
 	RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
-	int actual;
+	struct stasis_message *msg;
+	struct stasis_message_type *type;
+	struct ast_endpoint_snapshot *actual_snapshot;
+	struct stasis_cache_update *update;
+	int message_index;
 
 	switch (cmd) {
 	case TEST_INIT:
@@ -72,46 +98,200 @@
 		break;
 	}
 
-	uut = ast_endpoint_create("TEST", "test_res");
-	ast_test_validate(test, NULL != uut);
-
+	/* Subscribe to the cache topic */
 	sink = stasis_message_sink_create();
 	ast_test_validate(test, NULL != sink);
 
-	sub = stasis_subscribe(ast_endpoint_topic(uut), stasis_message_sink_cb,
-		sink);
+	sub = stasis_subscribe(
+		stasis_caching_get_topic(ast_endpoint_topic_all_cached()),
+		stasis_message_sink_cb(), sink);
 	ast_test_validate(test, NULL != sub);
 
-	chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", "test_res", "100",
+	uut = ast_endpoint_create("TEST", __func__);
+	ast_test_validate(test, NULL != uut);
+
+	/* Since the cache topic is a singleton (ew), it may have messages from
+	 * elsewheres that it's processing, or maybe even some final messages
+	 * from the prior test. We've got to wait_for our specific message,
+	 * instead of wait_for_count.
+	 */
+	message_index = stasis_message_sink_wait_for(sink, 0,
+		cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+	ast_test_validate(test, 0 <= message_index);
+
+	/* First message should be a cache creation entry for our endpont */
+	msg = sink->messages[message_index];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, stasis_cache_update_type() == type);
+	update = stasis_message_data(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+	ast_test_validate(test, NULL == update->old_snapshot);
+	actual_snapshot = stasis_message_data(update->new_snapshot);
+	ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+	ast_test_validate(test,
+		0 == strcmp(__func__, actual_snapshot->resource));
+
+	ast_endpoint_shutdown(uut);
+	uut = NULL;
+	message_index = stasis_message_sink_wait_for(sink, message_index + 1,
+		cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+	ast_test_validate(test, 0 <= message_index);
+	/* Now we should have a cache removal entry */
+	msg = sink->messages[message_index];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, stasis_cache_update_type() == type);
+	update = stasis_message_data(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+	actual_snapshot = stasis_message_data(update->old_snapshot);
+	ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+	ast_test_validate(test,
+		0 == strcmp(__func__, actual_snapshot->resource));
+	ast_test_validate(test, NULL == update->new_snapshot);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache_clear)
+{
+	RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+	RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+	RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+	struct stasis_message *msg;
+	struct stasis_message_type *type;
+	struct ast_endpoint_snapshot *actual_snapshot;
+	int actual_count;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test endpoint setters";
+		info->description = "Test endpoint setters";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	uut = ast_endpoint_create("TEST", __func__);
+	ast_test_validate(test, NULL != uut);
+
+	sink = stasis_message_sink_create();
+	ast_test_validate(test, NULL != sink);
+
+	sub = stasis_subscribe(ast_endpoint_topic(uut),
+		stasis_message_sink_cb(), sink);
+	ast_test_validate(test, NULL != sub);
+
+	ast_endpoint_set_state(uut, AST_ENDPOINT_OFFLINE);
+	actual_count = stasis_message_sink_wait_for_count(sink, 1,
+		STASIS_SINK_DEFAULT_WAIT);
+	msg = sink->messages[0];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+	actual_snapshot = stasis_message_data(msg);
+	ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state);
+
+	ast_endpoint_set_max_channels(uut, 8675309);
+	actual_count = stasis_message_sink_wait_for_count(sink, 2,
+		STASIS_SINK_DEFAULT_WAIT);
+	msg = sink->messages[1];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+	actual_snapshot = stasis_message_data(msg);
+	ast_test_validate(test, 8675309 == actual_snapshot->max_channels);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(channel_messages)
+{
+	RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+	RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+	RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+	struct stasis_message *msg;
+	struct stasis_message_type *type;
+	struct ast_endpoint_snapshot *actual_snapshot;
+	int actual_count;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test endpoint setters";
+		info->description = "Test endpoint setters";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	uut = ast_endpoint_create("TEST", __func__);
+	ast_test_validate(test, NULL != uut);
+
+	sink = stasis_message_sink_create();
+	ast_test_validate(test, NULL != sink);
+
+	sub = stasis_subscribe(ast_endpoint_topic(uut),
+		stasis_message_sink_cb(), sink);
+	ast_test_validate(test, NULL != sub);
+
+	chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100",
 		"100", "default", NULL, 0, "TEST/test_res");
 	ast_test_validate(test, NULL != chan);
 
 	ast_endpoint_add_channel(uut, chan);
 
-	actual = stasis_message_sink_wait_for(sink, 1, DEFAULT_WAIT_MILLIS);
-
-	for (actual = 0; actual < sink->num_messages; ++actual) {
-		struct stasis_message *msg = sink->messages[actual];
-		struct stasis_message_type *type = stasis_message_type(msg);
-		printf("%s\n", stasis_message_type_name(type));
-	}
-
-	ast_test_validate(test, 1 == actual);
-
-	safe_channel_release(chan);
+	actual_count = stasis_message_sink_wait_for_count(sink, 2,
+		STASIS_SINK_DEFAULT_WAIT);
+	ast_test_validate(test, 2 == actual_count);
+
+	msg = sink->messages[0];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+	msg = sink->messages[1];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+	actual_snapshot = stasis_message_data(msg);
+	ast_test_validate(test, 1 == actual_snapshot->num_channels);
+
+	safe_channel_hangup(chan);
 	chan = NULL;
 
+	actual_count = stasis_message_sink_wait_for_count(sink, 5,
+		STASIS_SINK_DEFAULT_WAIT);
+	ast_test_validate(test, 5 == actual_count);
+
+	msg = sink->messages[2];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+	msg = sink->messages[3];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, stasis_cache_clear_type() == type);
+
+	msg = sink->messages[4];
+	type = stasis_message_type(msg);
+	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+	actual_snapshot = stasis_message_data(msg);
+	ast_test_validate(test, 0 == actual_snapshot->num_channels);
+
 	return AST_TEST_PASS;
 }
 
 static int unload_module(void)
 {
+	AST_TEST_UNREGISTER(state_changes);
+	AST_TEST_UNREGISTER(cache_clear);
 	AST_TEST_UNREGISTER(channel_messages);
 	return 0;
 }
 
 static int load_module(void)
 {
+	AST_TEST_REGISTER(state_changes);
+	AST_TEST_REGISTER(cache_clear);
 	AST_TEST_REGISTER(channel_messages);
 	return AST_MODULE_LOAD_SUCCESS;
 }




More information about the asterisk-commits mailing list