[asterisk-commits] dlee: branch dlee/stasis-http r384881 - in /team/dlee/stasis-http: ./ apps/ i...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Apr 8 08:34:33 CDT 2013
Author: dlee
Date: Mon Apr 8 08:34:29 2013
New Revision: 384881
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384881
Log:
Merged revisions 384587-384823 from http://svn.asterisk.org/svn/asterisk/team/dlee/stasis-app
Modified:
team/dlee/stasis-http/ (props changed)
team/dlee/stasis-http/apps/app_stasis.c
team/dlee/stasis-http/include/asterisk/app_stasis.h
team/dlee/stasis-http/res/res_stasis_websocket.c
team/dlee/stasis-http/tests/test_app_stasis.c
Propchange: team/dlee/stasis-http/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Apr 8 08:34:29 2013
@@ -1,1 +1,1 @@
-/team/dlee/stasis-app:1-384506 /trunk:1-384488
+/team/dlee/stasis-app:1-384880 /trunk:1-384490
Modified: team/dlee/stasis-http/apps/app_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/apps/app_stasis.c?view=diff&rev=384881&r1=384880&r2=384881
==============================================================================
--- team/dlee/stasis-http/apps/app_stasis.c (original)
+++ team/dlee/stasis-http/apps/app_stasis.c Mon Apr 8 08:34:29 2013
@@ -48,13 +48,13 @@
<para>Name of the application to invoke.</para>
</parameter>
<parameter name="args">
- <para>Optional arguments for the application
- invocation.</para>
+ <para>Optional comma-delimited arguments for the
+ application invocation.</para>
</parameter>
</syntax>
<description>
<para>
- Invoke an external Stasis application.
+ Invoke a Stasis application.
</para>
</description>
</application>
@@ -111,6 +111,14 @@
char name[];
};
+static void app_dtor(void *obj)
+{
+ struct app *app = obj;
+
+ ao2_cleanup(app->data);
+ app->data = NULL;
+}
+
/*! Constructor for \ref app. */
static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
{
@@ -121,7 +129,7 @@
ast_assert(handler != NULL);
size = sizeof(*app) + strlen(name) + 1;
- app = ao2_alloc_options(size, NULL, AO2_ALLOC_OPT_LOCK_MUTEX);
+ app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
if (!app) {
return NULL;
@@ -129,12 +137,13 @@
strncpy(app->name, name, size - sizeof(*app));
app->handler = handler;
+ ao2_ref(data, +1);
app->data = data;
return app;
}
-/*! AO2 hash function for \ref stasis_app */
+/*! AO2 hash function for \ref app */
static int app_hash(const void *obj, const int flags)
{
const struct app *app = obj;
@@ -143,7 +152,7 @@
return ast_str_hash(name);
}
-/*! AO2 comparison function for \ref stasis_app */
+/*! AO2 comparison function for \ref app */
static int app_compare(void *lhs, void *rhs, int flags)
{
const struct app *lhs_app = lhs;
@@ -678,13 +687,14 @@
app->handler(app->data, app_name, msg);
app->handler = handler;
+ ao2_cleanup(app->data);
+ ao2_ref(data, +1);
app->data = data;
} else {
app = app_create(app_name, handler, data);
if (app) {
ao2_link_flags(apps, app, OBJ_NOLOCK);
} else {
- ast_log(LOG_ERROR, "Failed to allocate stasis_app\n");
return -1;
}
}
Modified: team/dlee/stasis-http/include/asterisk/app_stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/include/asterisk/app_stasis.h?view=diff&rev=384881&r1=384880&r2=384881
==============================================================================
--- team/dlee/stasis-http/include/asterisk/app_stasis.h (original)
+++ team/dlee/stasis-http/include/asterisk/app_stasis.h Mon Apr 8 08:34:29 2013
@@ -61,15 +61,18 @@
* \param app_name Name of the application being dispatched to.
* \param message Message to handle. (borrowed copy)
*/
-typedef void (*stasis_app_cb)(void *data, const char *app_name, struct ast_json *message);
+typedef void (*stasis_app_cb)(void *data, const char *app_name,
+ struct ast_json *message);
/*!
* \brief Register a new Stasis application.
+ *
* If an application is already registered with the given name, the old
* application is sent a 'replaced' message and unregistered.
+ *
* \param app_name Name of this application.
* \param handler Callback for application messages.
- * \param data Data blob to pass to the callback.
+ * \param data Data blob to pass to the callback. Must be AO2 managed.
* \return 0 for success
* \return -1 for error.
*/
Modified: team/dlee/stasis-http/res/res_stasis_websocket.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/res/res_stasis_websocket.c?view=diff&rev=384881&r1=384880&r2=384881
==============================================================================
--- team/dlee/stasis-http/res/res_stasis_websocket.c (original)
+++ team/dlee/stasis-http/res/res_stasis_websocket.c Mon Apr 8 08:34:29 2013
@@ -95,7 +95,7 @@
const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
if (strcmp(lhs_app->name, rhs_name) == 0) {
- return CMP_MATCH | CMP_STOP;
+ return CMP_MATCH;
} else {
return 0;
}
@@ -112,6 +112,64 @@
struct ao2_container *websocket_apps;
};
+static void session_dtor(void *obj)
+{
+ struct stasis_ws_session_info *session = obj;
+
+ /* session_shutdown should have been called before */
+ ast_assert(session->ws_session == NULL);
+ ast_assert(session->websocket_apps == NULL);
+}
+
+static struct stasis_ws_session_info *session_create(
+ struct ast_websocket *ws_session)
+{
+ RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup);
+
+ session = ao2_alloc(sizeof(*session), session_dtor);
+
+ session->ws_session = ws_session;
+ session->websocket_apps =
+ ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
+
+ if (!session->websocket_apps) {
+ return NULL;
+ }
+
+ ao2_ref(session, +1);
+ return session;
+}
+
+/*!
+ * \brief Explicitly shutdown a session.
+ *
+ * An explicit shutdown is necessary, since stasis-app has a reference to this
+ * session. We also need to be sure to null out the \c ws_session field, since
+ * the websocket is about to go away.
+ *
+ * \param session Session info struct.
+ */
+static void session_shutdown(struct stasis_ws_session_info *session)
+{
+ struct ao2_iterator i;
+ struct websocket_app *app;
+ SCOPED_AO2LOCK(lock, session);
+
+ i = ao2_iterator_init(session->websocket_apps, 0);
+ while ((app = ao2_iterator_next(&i))) {
+ stasis_app_unregister(app->name);
+ ao2_cleanup(app);
+ }
+ ao2_iterator_destroy(&i);
+ ao2_cleanup(session->websocket_apps);
+
+ session->websocket_apps = NULL;
+ session->ws_session = NULL;
+}
+
+/*!
+ * \brief Callback handler for Stasis application messages.
+ */
static void app_handler(void *data, const char *app_name,
struct ast_json *message)
{
@@ -120,16 +178,31 @@
res = ast_json_object_set(message, "application",
ast_json_string_create(app_name));
- ast_assert(res == 0);
-
- websocket_write_json(session->ws_session, message);
-}
-
-static int register_apps(struct stasis_ws_session_info *session,
- const char *app_list)
+ if(res != 0) {
+ return;
+ }
+
+ ao2_lock(session);
+ if (session->ws_session) {
+ websocket_write_json(session->ws_session, message);
+ }
+ ao2_unlock(session);
+}
+
+/*!
+ * \brief Register for all of the apps given.
+ * \param session Session info struct.
+ * \param app_list Comma seperated list of app names to register.
+ */
+static int session_register_apps(struct stasis_ws_session_info *session,
+ const char *app_list)
{
RAII_VAR(char *, to_free, NULL, ast_free);
char *apps, *app_name;
+ SCOPED_AO2LOCK(lock, session);
+
+ ast_assert(session->ws_session != NULL);
+ ast_assert(session->websocket_apps != NULL);
to_free = apps = ast_strdup(app_list);
if (!apps) {
@@ -152,43 +225,58 @@
return 0;
}
-static void websocket_callback(struct ast_websocket *session,
+static void websocket_callback(struct ast_websocket *ws_session,
struct ast_variable *parameters,
struct ast_variable *headers)
{
- struct stasis_ws_session_info stasis_session = {};
- struct ao2_iterator i;
- struct websocket_app *app;
- struct ast_variable *param;
+ RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup);
+ struct ast_variable *param = NULL;
int res;
ast_debug(3, "Stasis web socket connection\n");
- if (ast_websocket_set_nonblock(session) != 0) {
+ if (ast_websocket_set_nonblock(ws_session) != 0) {
ast_log(LOG_ERROR,
"Stasis web socket failed to set nonblock; closing\n");
goto end;
}
- stasis_session.websocket_apps =
- ao2_container_alloc(APPS_NUM_BUCKETS, hash_app, compare_app);
- stasis_session.ws_session = session;
+ stasis_session = session_create(ws_session);
+
+ if (!stasis_session) {
+ websocket_write_json(ws_session, oom_json);
+ goto end;
+ }
for (param = parameters; param; param = param->next) {
if (strcmp(param->name, "app") == 0) {
- int ret = register_apps(&stasis_session, param->value);
+ int ret = session_register_apps(
+ stasis_session, param->value);
if (ret != 0) {
goto end;
}
}
}
- while ((res = ast_wait_for_input(ast_websocket_fd(session), -1)) > 0) {
+ if (ao2_container_count(stasis_session->websocket_apps) == 0) {
+ RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
+
+ msg = ast_json_pack("{s: s, s: [s]}",
+ "error", "MissingParams",
+ "params", "app");
+ if (msg) {
+ websocket_write_json(ws_session, msg);
+ }
+
+ goto end;
+ }
+
+ while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) {
char *payload;
uint64_t payload_len;
enum ast_websocket_opcode opcode;
int fragmented;
- int read = ast_websocket_read(session, &payload, &payload_len,
+ int read = ast_websocket_read(ws_session, &payload, &payload_len,
&opcode, &fragmented);
if (read) {
@@ -203,24 +291,16 @@
}
end:
- i = ao2_iterator_init(stasis_session.websocket_apps, 0);
- while ((app = ao2_iterator_next(&i))) {
- stasis_app_unregister(app->name);
- ao2_cleanup(app);
- }
- ao2_iterator_destroy(&i);
- ao2_cleanup(stasis_session.websocket_apps);
-
- ast_websocket_unref(session);
+ session_shutdown(stasis_session);
+ ast_websocket_unref(ws_session);
}
static int load_module(void)
{
int r = 0;
- oom_json = ast_json_pack(
- "{s: s}",
- "error", "Out of memory");
+ oom_json = ast_json_pack("{s: s}",
+ "error", "OutOfMemory");
if (!oom_json) {
/* ironic */
return AST_MODULE_LOAD_FAILURE;
Modified: team/dlee/stasis-http/tests/test_app_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-http/tests/test_app_stasis.c?view=diff&rev=384881&r1=384880&r2=384881
==============================================================================
--- team/dlee/stasis-http/tests/test_app_stasis.c (original)
+++ team/dlee/stasis-http/tests/test_app_stasis.c Mon Apr 8 08:34:29 2013
@@ -65,20 +65,24 @@
struct ast_json *messages;
};
+static void app_data_dtor(void *obj)
+{
+ struct app_data *actual = obj;
+
+ ast_json_unref(actual->messages);
+ actual->messages = NULL;
+}
+
static struct app_data *app_data_create(void)
{
- struct app_data *res = ast_calloc(1, sizeof(struct app_data));
+ struct app_data *res = ao2_alloc(sizeof(struct app_data), app_data_dtor);
+
+ if (!res) {
+ return NULL;
+ }
+
res->messages = ast_json_array_create();
return res;
-}
-
-static void app_data_dtor(struct app_data *actual)
-{
- if (actual) {
- ast_json_unref(actual->messages);
- actual->messages = NULL;
- ast_free(actual);
- }
}
static void test_handler(void *data, const char *app_name, struct ast_json *message)
@@ -92,7 +96,7 @@
AST_TEST_DEFINE(app_invoke_one)
{
- RAII_VAR(struct app_data *, app_data, NULL, app_data_dtor);
+ RAII_VAR(struct app_data *, app_data, NULL, ao2_cleanup);
RAII_VAR(char *, app_name, NULL, stasis_app_unregister);
RAII_VAR(struct ast_json *, expected_message, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
@@ -127,8 +131,8 @@
AST_TEST_DEFINE(app_replaced)
{
- RAII_VAR(struct app_data *, app_data1, NULL, app_data_dtor);
- RAII_VAR(struct app_data *, app_data2, NULL, app_data_dtor);
+ RAII_VAR(struct app_data *, app_data1, NULL, ao2_cleanup);
+ RAII_VAR(struct app_data *, app_data2, NULL, ao2_cleanup);
RAII_VAR(char *, app_name, NULL, stasis_app_unregister);
RAII_VAR(struct ast_json *, expected_message1, NULL, ast_json_unref);
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
More information about the asterisk-commits
mailing list