[asterisk-commits] mjordan: branch mjordan/12-messaging r418447 - in /team/mjordan/12-messaging:...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Jul 12 22:21:03 CDT 2014
Author: mjordan
Date: Sat Jul 12 22:21:01 2014
New Revision: 418447
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=418447
Log:
Fix documentation, add res_stasis messaging
Added:
team/mjordan/12-messaging/res/stasis/messaging.c (with props)
team/mjordan/12-messaging/res/stasis/messaging.h (with props)
Modified:
team/mjordan/12-messaging/tests/test_message.c
Added: team/mjordan/12-messaging/res/stasis/messaging.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/res/stasis/messaging.c?view=auto&rev=418447
==============================================================================
--- team/mjordan/12-messaging/res/stasis/messaging.c (added)
+++ team/mjordan/12-messaging/res/stasis/messaging.c Sat Jul 12 22:21:01 2014
@@ -1,0 +1,531 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan at digium.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/message.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/vector.h"
+#include "asterisk/lock.h"
+#include "asterisk/utils.h"
+#include "asterisk/test.h"
+#include "messaging.h"
+
+/*!
+ * \brief Number of buckets for the \ref endpoint_subscriptions container
+ */
+#define ENDPOINTS_NUM_BUCKETS 127
+
+/*! \brief Storage object for an application */
+struct application_tuple {
+ /*! ao2 ref counted private object to pass to the callback */
+ void *pvt;
+ /*! The callback to call when this application has a message */
+ message_received_cb callback;
+ /*! The name (key) of the application */
+ char app_name[];
+};
+
+/*! \brief A subscription to some endpoint or technology */
+struct message_subscription {
+ /*! The applications that have subscribed to this endpoint or tech */
+ AST_VECTOR(, struct application_tuple *) applications;
+ /*! The name of this endpoint or tech */
+ char token[];
+};
+
+/*! \brief The subscriptions to endpoints */
+static struct ao2_container *endpoint_subscriptions;
+
+/*!
+ * \brief The subscriptions to technologies
+ *
+ * \note These are stored separately from standard endpoints, given how
+ * relatively few of them there are.
+ */
+static AST_VECTOR(,struct message_subscription *) tech_subscriptions;
+
+/*! \brief RWLock for \c tech_subscriptions */
+static ast_rwlock_t tech_subscriptions_lock;
+
+/*! \internal \brief Destructor for \c application_tuple */
+static void application_tuple_dtor(void *obj)
+{
+ struct application_tuple *tuple = obj;
+
+ ao2_cleanup(tuple->pvt);
+}
+
+/*! \internal \brief Constructor for \c application_tuple */
+static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt)
+{
+ struct application_tuple *tuple;
+ size_t size = sizeof(*tuple) + strlen(app_name) + 1;
+
+ ast_assert(callback != NULL);
+
+ tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!tuple) {
+ return NULL;
+ }
+
+ strcpy(tuple->app_name, app_name); /* Safe */
+ tuple->pvt = ao2_bump(pvt);
+ tuple->callback = callback;
+
+ return tuple;
+}
+
+/*! \internal \brief Destructor for \ref message_subscription */
+static void message_subscription_dtor(void *obj)
+{
+ struct message_subscription *sub = obj;
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+ ao2_cleanup(tuple);
+ }
+ AST_VECTOR_FREE(&sub->applications);
+}
+
+/*! \internal \brief Constructor for \ref message_subscription */
+static struct message_subscription *message_subscription_alloc(const char *token)
+{
+ struct message_subscription *sub;
+ size_t size = sizeof(*sub) + strlen(token) + 1;
+
+ sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK);
+ if (!sub) {
+ return NULL;
+ }
+ strcpy(sub->token, token); /* Safe */
+
+ return sub;
+}
+
+/*! AO2 hash function for \ref message_subscription */
+static int message_subscription_hash_cb(const void *obj, const int flags)
+{
+ const struct message_subscription *sub;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ sub = obj;
+ key = sub->token;
+ break;
+ default:
+ /* Hash can only work on something with a full key. */
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_hash(key);
+}
+
+/*! AO2 comparison function for \ref message_subscription */
+static int message_subscription_compare_cb(void *obj, void *arg, int flags)
+{
+ const struct message_subscription *object_left = obj;
+ const struct message_subscription *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->token;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcmp(object_left->token, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /*
+ * We could also use a partial key struct containing a length
+ * so strlen() does not get called for every comparison instead.
+ */
+ cmp = strncmp(object_left->token, right_key, strlen(right_key));
+ break;
+ default:
+ /*
+ * What arg points to is specific to this traversal callback
+ * and has no special meaning to astobj2.
+ */
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ /*
+ * At this point the traversal callback is identical to a sorted
+ * container.
+ */
+ return CMP_MATCH;
+}
+
+/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */
+static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len)
+{
+ const char *endpoint = ast_msg_get_endpoint(msg);
+
+ snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg),
+ ast_strlen_zero(endpoint) ? "" : "/",
+ S_OR(endpoint, ""));
+}
+
+/*! \internal
+ * \brief Callback from the \c message API that determines if we can handle
+ * this message
+ */
+static int has_destination_cb(const struct ast_msg *msg)
+{
+ struct message_subscription *sub;
+ int i;
+ char buf[256];
+
+ msg_to_endpoint(msg, buf, sizeof(buf));
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
+ || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ sub = NULL; /* No ref bump! */
+ goto match;
+ }
+
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+
+ sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+ if (sub) {
+ goto match;
+ }
+
+ ast_debug(1, "No subscription found for %s\n", buf);
+ return 0;
+
+match:
+ ao2_cleanup(sub);
+ return 1;
+}
+
+static struct ast_json *msg_to_json(struct ast_msg *msg)
+{
+ struct ast_json *json_obj;
+ struct ast_json *json_vars;
+ struct ast_msg_var_iterator *it_vars;
+ const char *name;
+ const char *value;
+
+ it_vars = ast_msg_var_iterator_init(msg);
+ if (!it_vars) {
+ return NULL;
+ }
+
+ json_vars = ast_json_array_create();
+ if (!json_vars) {
+ return NULL;
+ }
+
+ while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) {
+ struct ast_json *json_tuple;
+
+ json_tuple = ast_json_pack("{s: s}", name, value);
+ if (!json_tuple) {
+ ast_json_free(json_vars);
+ return NULL;
+ }
+
+ ast_json_array_append(json_vars, json_tuple);
+ ast_msg_var_unref_current(it_vars);
+ }
+ ast_msg_var_iterator_destroy(it_vars);
+
+ json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}",
+ "from", ast_msg_get_from(msg),
+ "to", ast_msg_get_to(msg),
+ "body", ast_msg_get_body(msg),
+ "variables", json_vars);
+
+ return json_obj;
+}
+
+static int handle_msg_cb(struct ast_msg *msg)
+{
+ struct message_subscription *sub;
+ int i;
+ char buf[256];
+ const char *endpoint_name;
+ struct ast_json *json_msg;
+
+ msg_to_endpoint(msg, buf, sizeof(buf));
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (!sub) {
+ continue;
+ }
+
+ if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ ao2_bump(sub);
+ endpoint_name = buf;
+ goto match;
+ }
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+
+ sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+ if (sub) {
+ endpoint_name = buf;
+ goto match;
+ }
+
+ return -1;
+
+match:
+ ast_debug(3, "Dispatching message for %s\n", endpoint_name);
+
+ json_msg = msg_to_json(msg);
+ if (!json_msg) {
+ ao2_ref(sub, -1);
+ return -1;
+ }
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+ tuple->callback(endpoint_name, json_msg, tuple->pvt);
+ }
+
+ ast_json_unref(json_msg);
+ ao2_ref(sub, -1);
+ return 0;
+}
+
+struct ast_msg_handler ari_msg_handler = {
+ .name = "ari",
+ .handle_msg = handle_msg_cb,
+ .has_destination = has_destination_cb,
+};
+
+static int messaging_subscription_cmp(struct message_subscription *sub, const char *key)
+{
+ return !strcmp(sub->token, key) ? 1 : 0;
+}
+
+static int application_tuple_cmp(struct application_tuple *item, const char *key)
+{
+ return !strcmp(item->app_name, key) ? 1 : 0;
+}
+
+static int is_app_subscribed(struct message_subscription *sub, const char *app_name)
+{
+ int i;
+
+ for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+ struct application_tuple *tuple;
+
+ tuple = AST_VECTOR_GET(&sub->applications, i);
+ if (tuple && !strcmp(tuple->app_name, app_name)) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+static struct message_subscription *get_subscription(struct ast_endpoint *endpoint)
+{
+ struct message_subscription *sub = NULL;
+
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
+ } else {
+ int i;
+
+ ast_rwlock_rdlock(&tech_subscriptions_lock);
+ for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+ sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+ if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+ ao2_bump(sub);
+ break;
+ }
+ }
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+
+ return sub;
+}
+
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
+{
+ RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+
+ endpoint = ast_endpoint_find_by_id(endpoint_id);
+ if (!endpoint) {
+ return;
+ }
+
+ sub = get_subscription(endpoint);
+ if (!sub) {
+ return;
+ }
+
+ ao2_lock(sub);
+ if (!is_app_subscribed(sub, app_name)) {
+ ao2_unlock(sub);
+ return;
+ }
+
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
+ if (AST_VECTOR_SIZE(&sub->applications) == 0) {
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ ao2_unlink(endpoint_subscriptions, sub);
+ } else {
+ ast_rwlock_wrlock(&tech_subscriptions_lock);
+ AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+ messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+ }
+ ao2_unlock(sub);
+ ao2_ref(sub, -1);
+
+ ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
+ app_name, ast_endpoint_get_id(endpoint));
+}
+
+static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
+{
+ struct message_subscription *sub = get_subscription(endpoint);
+
+ if (sub) {
+ return sub;
+ }
+
+ sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+ if (!sub) {
+ return NULL;
+ }
+
+ if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+ ao2_link(endpoint_subscriptions, sub);
+ } else {
+ ast_rwlock_wrlock(&tech_subscriptions_lock);
+ AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub));
+ ast_rwlock_unlock(&tech_subscriptions_lock);
+ }
+
+ return sub;
+}
+
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
+{
+ RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+ struct application_tuple *tuple;
+
+ sub = get_or_create_subscription(endpoint);
+ if (!sub) {
+ return -1;
+ }
+
+ ao2_lock(sub);
+ if (is_app_subscribed(sub, app_name)) {
+ ao2_unlock(sub);
+ return 0;
+ }
+
+ tuple = application_tuple_alloc(app_name, callback, pvt);
+ if (!tuple) {
+ ao2_unlock(sub);
+ return -1;
+ }
+ AST_VECTOR_APPEND(&sub->applications, tuple);
+ ao2_unlock(sub);
+
+ ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+ ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
+ app_name, ast_endpoint_get_id(endpoint));
+
+ return 0;
+}
+
+
+int messaging_cleanup(void)
+{
+ ast_msg_handler_unregister(&ari_msg_handler);
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ ast_rwlock_destroy(&tech_subscriptions_lock);\
+
+ return 0;
+}
+
+int messaging_init(void)
+{
+ endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
+ ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL,
+ message_subscription_compare_cb, "Endpoint messaging subscription container creation");
+ if (!endpoint_subscriptions) {
+ return -1;
+ }
+
+ if (AST_VECTOR_INIT(&tech_subscriptions, 4)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ return -1;
+ }
+
+ if (ast_rwlock_init(&tech_subscriptions_lock)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ return -1;
+ }
+
+ if (ast_msg_handler_register(&ari_msg_handler)) {
+ ao2_ref(endpoint_subscriptions, -1);
+ AST_VECTOR_FREE(&tech_subscriptions);
+ ast_rwlock_destroy(&tech_subscriptions_lock);
+ return -1;
+ }
+
+ return 0;
+}
Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/mjordan/12-messaging/res/stasis/messaging.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/res/stasis/messaging.h?view=auto&rev=418447
==============================================================================
--- team/mjordan/12-messaging/res/stasis/messaging.h (added)
+++ team/mjordan/12-messaging/res/stasis/messaging.h Sat Jul 12 22:21:01 2014
@@ -1,0 +1,83 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_RES_STASIS_MESSAGING_H
+#define _ASTERISK_RES_STASIS_MESSAGING_H
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan at digium.com>
+ * \since 12.4.0
+ */
+
+/*!
+ * \brief Callback handler for when a message is received from the core
+ *
+ * \param endpoint_id The ID of the endpoint that we received the message from
+ * \param json_msg JSON representation of the text message
+ * \param pvt ao2 ref counted pvt passed during registration
+ *
+ * \retval 0 the message was handled
+ * \retval non-zero the message was not handled
+ */
+typedef int (* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt);
+
+/*!
+ * \brief Subscribe for messages from a particular endpoint
+ *
+ * \param app_name Name of the stasis application to unsubscribe from messaging
+ * \param endpoint_id The ID of the endpoint we no longer care about
+ *
+ * \retval 0 success
+ * \retval -1 error
+ */
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id);
+
+/*!
+ * \brief Subscribe an application to an endpoint for messages
+ *
+ * \param app_name The name of the \ref stasis application to subscribe to \c endpoint
+ * \param endpoint The endpoint object to subscribe to
+ * \param message_received_cb The callback to call when a message is received
+ * \param pvt An ao2 ref counted object that will be passed to the callback.
+ *
+ * \retval 0 subscription was successful
+ * \retval -1 subscription failed
+ */
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt);
+
+/*!
+ * \brief Tidy up the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_cleanup(void);
+
+/*!
+ * \brief Initialize the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_init(void);
+
+#endif /* #define _ASTERISK_RES_STASIS_MESSAGING_H */
Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: team/mjordan/12-messaging/tests/test_message.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/tests/test_message.c?view=diff&rev=418447&r1=418446&r2=418447
==============================================================================
--- team/mjordan/12-messaging/tests/test_message.c (original)
+++ team/mjordan/12-messaging/tests/test_message.c Sat Jul 12 22:21:01 2014
@@ -728,7 +728,7 @@
ast_msg_set_context(msg, NULL);
ast_test_validate(test, ast_msg_has_destination(msg) == 1);
- ast_msg_set_to(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
+ ast_msg_set_to(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
ast_test_validate(test, ast_msg_has_destination(msg) == 0);
result = ast_msg_handler_unregister(&test_msg_handler);
More information about the asterisk-commits
mailing list