[asterisk-commits] dlee: branch dlee/endpoints r387475 - in /team/dlee/endpoints: include/asteri...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu May 2 13:40:03 CDT 2013
Author: dlee
Date: Thu May 2 13:40:01 2013
New Revision: 387475
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=387475
Log:
Unit tests pass
Modified:
team/dlee/endpoints/include/asterisk/stasis_endpoints.h
team/dlee/endpoints/include/asterisk/stasis_test.h
team/dlee/endpoints/main/endpoints.c
team/dlee/endpoints/main/stasis_cache.c
team/dlee/endpoints/main/stasis_endpoints.c
team/dlee/endpoints/res/res_stasis_test.c
team/dlee/endpoints/tests/test_endpoints.c
team/dlee/endpoints/tests/test_stasis_endpoints.c
Modified: team/dlee/endpoints/include/asterisk/stasis_endpoints.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/include/asterisk/stasis_endpoints.h?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/include/asterisk/stasis_endpoints.h (original)
+++ team/dlee/endpoints/include/asterisk/stasis_endpoints.h Thu May 2 13:40:01 2013
@@ -60,7 +60,7 @@
*/
int max_channels;
/*! Number of channels currently active on this endpoint */
- int current_channels;
+ int num_channels;
/*! Channel ids */
char *channel_ids[];
};
Modified: team/dlee/endpoints/include/asterisk/stasis_test.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/include/asterisk/stasis_test.h?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/include/asterisk/stasis_test.h (original)
+++ team/dlee/endpoints/include/asterisk/stasis_test.h Thu May 2 13:40:01 2013
@@ -28,7 +28,7 @@
#include "asterisk/lock.h"
#include "asterisk/stasis.h"
-#define DEFAULT_WAIT_MILLIS 5000
+#define STASIS_SINK_DEFAULT_WAIT 5000
/*! \brief Structure that collects messages from a topic */
struct stasis_message_sink {
@@ -58,10 +58,15 @@
/*!
* \brief Topic callback to receive messages.
- * See \ref stasis_subscription_cb
+ *
+ * We return a function pointer instead of simply exposing the function because
+ * of the vagaries of dlopen(), \c RTLD_LAZY, and function pointers. See the
+ * comment on the implementation for details why.
+ *
+ * \return Function pointer to \ref stasis_message_sink's message handling
+ * function
*/
-void stasis_message_sink_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message);
+stasis_subscription_cb stasis_message_sink_cb(void);
/*!
* \brief Wait for a sink's num_messages field to reach a certain level.
@@ -74,8 +79,24 @@
* \return Actual sink->num_messages value at return.
* If this is < \a num_messages, then the timeout expired.
*/
-int stasis_message_sink_wait_for(struct stasis_message_sink *sink,
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
int num_messages, int timeout_millis);
+
+typedef int (*stasis_wait_cb)(struct stasis_message *msg, const void *data);
+
+/*!
+ * \brief Wait for a message that matches the given criteria.
+ *
+ * \param sink Sink to wait on.
+ * \param start Index of message to start with.
+ * \param cmp_cb comparison function. This returns true (non-zero) on match
+ * and false (zero) on match.
+ * \param timeout_millis Number of milliseconds to wait.
+ * \return Index of the matching message.
+ * \return Negative for no match.
+ */
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+ stasis_wait_cb cmp_cb, const void *data, int timeout_millis);
/*!
* \brief Ensures that no new messages are received.
Modified: team/dlee/endpoints/main/endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/endpoints.c (original)
+++ team/dlee/endpoints/main/endpoints.c Thu May 2 13:40:01 2013
@@ -119,6 +119,7 @@
struct ast_endpoint *endpoint = data;
struct ast_channel_snapshot *snapshot = stasis_message_data(message);
RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
+ int publish = 0;
ast_assert(endpoint != NULL);
ast_assert(snapshot != NULL);
@@ -129,9 +130,12 @@
if (!existing_id) {
ast_str_container_add(endpoint->channel_ids,
snapshot->uniqueid);
+ publish = 1;
}
ao2_unlock(endpoint);
- endpoint_publish_snapshot(endpoint);
+ if (publish) {
+ endpoint_publish_snapshot(endpoint);
+ }
}
static void endpoint_cache_clear(void *data,
@@ -322,8 +326,7 @@
i = ao2_iterator_init(endpoint->channel_ids, 0);
while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(char *, channel_id, obj, ao2_cleanup);
- snapshot->channel_ids[snapshot->current_channels++] =
- channel_id;
+ snapshot->channel_ids[snapshot->num_channels++] = channel_id;
}
ao2_ref(snapshot, +1);
Modified: team/dlee/endpoints/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/stasis_cache.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/stasis_cache.c (original)
+++ team/dlee/endpoints/main/stasis_cache.c Thu May 2 13:40:01 2013
@@ -365,7 +365,9 @@
update = update_create(topic, old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
} else {
- ast_log(LOG_ERROR,
+ /* While this could be a problem, it's very likely to
+ * happen with message forwarding */
+ ast_log(LOG_DEBUG,
"Attempting to remove an item from the cache that isn't there: %s %s\n",
stasis_message_type_name(clear->type), clear->id);
}
Modified: team/dlee/endpoints/main/stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/main/stasis_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/main/stasis_endpoints.c (original)
+++ team/dlee/endpoints/main/stasis_endpoints.c Thu May 2 13:40:01 2013
@@ -143,7 +143,7 @@
channel_array = ast_json_object_get(json, "channels");
ast_assert(channel_array != NULL);
- for (i = 0; i < snapshot->current_channels; ++i) {
+ for (i = 0; i < snapshot->num_channels; ++i) {
v = ast_json_array_append(channel_array,
ast_json_stringf("channel:%s",
snapshot->channel_ids[i]));
@@ -179,7 +179,7 @@
if (!endpoint_snapshot_type) {
endpoint_snapshot_type = stasis_message_type_create(
- "ast_endpoint_snapshot_type");
+ "ast_endpoint_snapshot");
}
if (!endpoint_snapshot_type) {
Modified: team/dlee/endpoints/res/res_stasis_test.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/res/res_stasis_test.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/res/res_stasis_test.c (original)
+++ team/dlee/endpoints/res/res_stasis_test.c Thu May 2 13:40:01 2013
@@ -97,7 +97,28 @@
return sink;
}
-void stasis_message_sink_cb(void *data, struct stasis_subscription *sub,
+/*!
+ * \brief Implementation of the stasis_message_sink_cb() callback.
+ *
+ * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
+ * it has to do with how we load modules.
+ *
+ * Modules have their own metadata compiled into them in the AST_MODULE_INFO()
+ * block. This includes dependency information in the \c nonoptreq field.
+ *
+ * Asterisk loads the module, inspects the field, then loads any needed
+ * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
+ * dlopen(), which defers binding function references until they are called.
+ *
+ * But when you take the address of a function, that function needs to be
+ * available at load time. So if some module used the address of
+ * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
+ * that module would fail to load.
+ *
+ * The stasis_message_sink_cb() function gives us a layer of indirection so that
+ * the initial lazy binding will still work as expected.
+ */
+static void message_sink_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
{
struct stasis_message_sink *sink = data;
@@ -132,7 +153,13 @@
ast_cond_signal(&sink->cond);
}
-int stasis_message_sink_wait_for(struct stasis_message_sink *sink,
+stasis_subscription_cb stasis_message_sink_cb(void)
+{
+ return message_sink_cb;
+}
+
+
+int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
int num_messages, int timeout_millis)
{
struct timespec deadline = make_deadline(timeout_millis);
@@ -143,7 +170,8 @@
if (r == ETIMEDOUT) {
break;
- } else if (r != 0) {
+ }
+ if (r != 0) {
ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
strerror(r));
break;
@@ -163,13 +191,59 @@
if (r == ETIMEDOUT) {
break;
- } else {
+ }
+ if (r != 0) {
ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
strerror(r));
break;
}
}
return sink->num_messages;
+}
+
+int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
+ stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
+{
+ struct timespec deadline = make_deadline(timeout_millis);
+
+ SCOPED_MUTEX(lock, &sink->lock);
+
+ /* wait for the start */
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ /* Timed out waiting for the start */
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+
+
+ while (!cmp_cb(sink->messages[start], data)) {
+ ++start;
+
+ while (sink->num_messages < start + 1) {
+ int r = ast_cond_timedwait(&sink->cond,
+ &sink->lock, &deadline);
+
+ if (r == ETIMEDOUT) {
+ return -1;
+ }
+ if (r != 0) {
+ ast_log(LOG_ERROR,
+ "Unexpected condition error: %s\n",
+ strerror(r));
+ return -2;
+ }
+ }
+ }
+
+ return start;
}
struct stasis_message *stasis_test_message_create(void)
Modified: team/dlee/endpoints/tests/test_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/tests/test_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/tests/test_endpoints.c (original)
+++ team/dlee/endpoints/tests/test_endpoints.c Thu May 2 13:40:01 2013
@@ -99,7 +99,7 @@
ast_test_validate(test, 0 == strcmp("test_res", snapshot->resource));
ast_test_validate(test, AST_ENDPOINT_UNKNOWN == snapshot->state);
ast_test_validate(test, -1 == snapshot->max_channels);
- ast_test_validate(test, 0 == snapshot->current_channels);
+ ast_test_validate(test, 0 == snapshot->num_channels);
return AST_TEST_PASS;
}
Modified: team/dlee/endpoints/tests/test_stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/endpoints/tests/test_stasis_endpoints.c?view=diff&rev=387475&r1=387474&r2=387475
==============================================================================
--- team/dlee/endpoints/tests/test_stasis_endpoints.c (original)
+++ team/dlee/endpoints/tests/test_stasis_endpoints.c Thu May 2 13:40:01 2013
@@ -38,28 +38,54 @@
#include "asterisk/channel.h"
#include "asterisk/endpoints.h"
#include "asterisk/module.h"
+#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_test.h"
#include "asterisk/test.h"
static const char *test_category = "/stasis/endpoints/";
-static void safe_channel_release(struct ast_channel *chan)
+static void safe_channel_hangup(struct ast_channel *chan)
{
if (!chan) {
return;
}
- ast_channel_release(chan);
-}
-
-AST_TEST_DEFINE(channel_messages)
+ ast_hangup(chan);
+}
+
+static int cache_update(struct stasis_message *msg, const void *data) {
+ struct stasis_cache_update *update;
+ struct ast_endpoint_snapshot *snapshot;
+ const char *name = data;
+
+ if (stasis_cache_update_type() != stasis_message_type(msg)) {
+ return 0;
+ }
+
+ update = stasis_message_data(msg);
+ if (ast_endpoint_snapshot_type() != update->type) {
+ return 0;
+ }
+
+ snapshot = stasis_message_data(update->old_snapshot);
+ if (!snapshot) {
+ snapshot = stasis_message_data(update->new_snapshot);
+ }
+
+ return 0 == strcmp(name, snapshot->resource);
+}
+
+AST_TEST_DEFINE(state_changes)
{
RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
- RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_release);
- RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
- int actual;
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ struct stasis_cache_update *update;
+ int message_index;
switch (cmd) {
case TEST_INIT:
@@ -72,46 +98,200 @@
break;
}
- uut = ast_endpoint_create("TEST", "test_res");
- ast_test_validate(test, NULL != uut);
-
+ /* Subscribe to the cache topic */
sink = stasis_message_sink_create();
ast_test_validate(test, NULL != sink);
- sub = stasis_subscribe(ast_endpoint_topic(uut), stasis_message_sink_cb,
- sink);
+ sub = stasis_subscribe(
+ stasis_caching_get_topic(ast_endpoint_topic_all_cached()),
+ stasis_message_sink_cb(), sink);
ast_test_validate(test, NULL != sub);
- chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", "test_res", "100",
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ /* Since the cache topic is a singleton (ew), it may have messages from
+ * elsewheres that it's processing, or maybe even some final messages
+ * from the prior test. We've got to wait_for our specific message,
+ * instead of wait_for_count.
+ */
+ message_index = stasis_message_sink_wait_for(sink, 0,
+ cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 0 <= message_index);
+
+ /* First message should be a cache creation entry for our endpont */
+ msg = sink->messages[message_index];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_update_type() == type);
+ update = stasis_message_data(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+ ast_test_validate(test, NULL == update->old_snapshot);
+ actual_snapshot = stasis_message_data(update->new_snapshot);
+ ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+ ast_test_validate(test,
+ 0 == strcmp(__func__, actual_snapshot->resource));
+
+ ast_endpoint_shutdown(uut);
+ uut = NULL;
+ message_index = stasis_message_sink_wait_for(sink, message_index + 1,
+ cache_update, __func__, STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 0 <= message_index);
+ /* Now we should have a cache removal entry */
+ msg = sink->messages[message_index];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_update_type() == type);
+ update = stasis_message_data(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == update->type);
+ actual_snapshot = stasis_message_data(update->old_snapshot);
+ ast_test_validate(test, 0 == strcmp("TEST", actual_snapshot->tech));
+ ast_test_validate(test,
+ 0 == strcmp(__func__, actual_snapshot->resource));
+ ast_test_validate(test, NULL == update->new_snapshot);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(cache_clear)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ int actual_count;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint setters";
+ info->description = "Test endpoint setters";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ sink = stasis_message_sink_create();
+ ast_test_validate(test, NULL != sink);
+
+ sub = stasis_subscribe(ast_endpoint_topic(uut),
+ stasis_message_sink_cb(), sink);
+ ast_test_validate(test, NULL != sub);
+
+ ast_endpoint_set_state(uut, AST_ENDPOINT_OFFLINE);
+ actual_count = stasis_message_sink_wait_for_count(sink, 1,
+ STASIS_SINK_DEFAULT_WAIT);
+ msg = sink->messages[0];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, AST_ENDPOINT_OFFLINE == actual_snapshot->state);
+
+ ast_endpoint_set_max_channels(uut, 8675309);
+ actual_count = stasis_message_sink_wait_for_count(sink, 2,
+ STASIS_SINK_DEFAULT_WAIT);
+ msg = sink->messages[1];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 8675309 == actual_snapshot->max_channels);
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(channel_messages)
+{
+ RAII_VAR(struct ast_endpoint *, uut, NULL, ast_endpoint_shutdown);
+ RAII_VAR(struct ast_channel *, chan, NULL, safe_channel_hangup);
+ RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ struct stasis_message *msg;
+ struct stasis_message_type *type;
+ struct ast_endpoint_snapshot *actual_snapshot;
+ int actual_count;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test endpoint setters";
+ info->description = "Test endpoint setters";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ uut = ast_endpoint_create("TEST", __func__);
+ ast_test_validate(test, NULL != uut);
+
+ sink = stasis_message_sink_create();
+ ast_test_validate(test, NULL != sink);
+
+ sub = stasis_subscribe(ast_endpoint_topic(uut),
+ stasis_message_sink_cb(), sink);
+ ast_test_validate(test, NULL != sub);
+
+ chan = ast_channel_alloc(0, AST_STATE_DOWN, "100", __func__, "100",
"100", "default", NULL, 0, "TEST/test_res");
ast_test_validate(test, NULL != chan);
ast_endpoint_add_channel(uut, chan);
- actual = stasis_message_sink_wait_for(sink, 1, DEFAULT_WAIT_MILLIS);
-
- for (actual = 0; actual < sink->num_messages; ++actual) {
- struct stasis_message *msg = sink->messages[actual];
- struct stasis_message_type *type = stasis_message_type(msg);
- printf("%s\n", stasis_message_type_name(type));
- }
-
- ast_test_validate(test, 1 == actual);
-
- safe_channel_release(chan);
+ actual_count = stasis_message_sink_wait_for_count(sink, 2,
+ STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 2 == actual_count);
+
+ msg = sink->messages[0];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+ msg = sink->messages[1];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 1 == actual_snapshot->num_channels);
+
+ safe_channel_hangup(chan);
chan = NULL;
+ actual_count = stasis_message_sink_wait_for_count(sink, 5,
+ STASIS_SINK_DEFAULT_WAIT);
+ ast_test_validate(test, 5 == actual_count);
+
+ msg = sink->messages[2];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_channel_snapshot_type() == type);
+
+ msg = sink->messages[3];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, stasis_cache_clear_type() == type);
+
+ msg = sink->messages[4];
+ type = stasis_message_type(msg);
+ ast_test_validate(test, ast_endpoint_snapshot_type() == type);
+ actual_snapshot = stasis_message_data(msg);
+ ast_test_validate(test, 0 == actual_snapshot->num_channels);
+
return AST_TEST_PASS;
}
static int unload_module(void)
{
+ AST_TEST_UNREGISTER(state_changes);
+ AST_TEST_UNREGISTER(cache_clear);
AST_TEST_UNREGISTER(channel_messages);
return 0;
}
static int load_module(void)
{
+ AST_TEST_REGISTER(state_changes);
+ AST_TEST_REGISTER(cache_clear);
AST_TEST_REGISTER(channel_messages);
return AST_MODULE_LOAD_SUCCESS;
}
More information about the asterisk-commits
mailing list