[svn-commits] mjordan: branch mjordan/12-messaging r418447 - in /team/mjordan/12-messaging:...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sat Jul 12 22:21:03 CDT 2014


Author: mjordan
Date: Sat Jul 12 22:21:01 2014
New Revision: 418447

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=418447
Log:
Fix documentation, add res_stasis messaging

Added:
    team/mjordan/12-messaging/res/stasis/messaging.c   (with props)
    team/mjordan/12-messaging/res/stasis/messaging.h   (with props)
Modified:
    team/mjordan/12-messaging/tests/test_message.c

Added: team/mjordan/12-messaging/res/stasis/messaging.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/res/stasis/messaging.c?view=auto&rev=418447
==============================================================================
--- team/mjordan/12-messaging/res/stasis/messaging.c (added)
+++ team/mjordan/12-messaging/res/stasis/messaging.c Sat Jul 12 22:21:01 2014
@@ -1,0 +1,531 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan at digium.com>
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/message.h"
+#include "asterisk/endpoints.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/vector.h"
+#include "asterisk/lock.h"
+#include "asterisk/utils.h"
+#include "asterisk/test.h"
+#include "messaging.h"
+
+/*!
+ * \brief Number of buckets for the \ref endpoint_subscriptions container
+ */
+#define ENDPOINTS_NUM_BUCKETS 127
+
+/*! \brief Storage object for an application */
+struct application_tuple {
+	/*! ao2 ref counted private object to pass to the callback */
+	void *pvt;
+	/*! The callback to call when this application has a message */
+	message_received_cb callback;
+	/*! The name (key) of the application */
+	char app_name[];
+};
+
+/*! \brief A subscription to some endpoint or technology */
+struct message_subscription {
+	/*! The applications that have subscribed to this endpoint or tech */
+	AST_VECTOR(, struct application_tuple *) applications;
+	/*! The name of this endpoint or tech */
+	char token[];
+};
+
+/*! \brief The subscriptions to endpoints */
+static struct ao2_container *endpoint_subscriptions;
+
+/*!
+ * \brief The subscriptions to technologies
+ *
+ * \note These are stored separately from standard endpoints, given how
+ * relatively few of them there are.
+ */
+static AST_VECTOR(,struct message_subscription *) tech_subscriptions;
+
+/*! \brief RWLock for \c tech_subscriptions */
+static ast_rwlock_t tech_subscriptions_lock;
+
+/*! \internal \brief Destructor for \c application_tuple */
+static void application_tuple_dtor(void *obj)
+{
+	struct application_tuple *tuple = obj;
+
+	ao2_cleanup(tuple->pvt);
+}
+
+/*! \internal \brief Constructor for \c application_tuple */
+static struct application_tuple *application_tuple_alloc(const char *app_name, message_received_cb callback, void *pvt)
+{
+	struct application_tuple *tuple;
+	size_t size = sizeof(*tuple) + strlen(app_name) + 1;
+
+	ast_assert(callback != NULL);
+
+	tuple = ao2_t_alloc(size, application_tuple_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!tuple) {
+		return NULL;
+	}
+
+	strcpy(tuple->app_name, app_name); /* Safe */
+	tuple->pvt = ao2_bump(pvt);
+	tuple->callback = callback;
+
+	return tuple;
+}
+
+/*! \internal \brief Destructor for \ref message_subscription */
+static void message_subscription_dtor(void *obj)
+{
+	struct message_subscription *sub = obj;
+	int i;
+
+	for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+		struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+		ao2_cleanup(tuple);
+	}
+	AST_VECTOR_FREE(&sub->applications);
+}
+
+/*! \internal \brief Constructor for \ref message_subscription */
+static struct message_subscription *message_subscription_alloc(const char *token)
+{
+	struct message_subscription *sub;
+	size_t size = sizeof(*sub) + strlen(token) + 1;
+
+	sub = ao2_t_alloc(size, message_subscription_dtor, AO2_ALLOC_OPT_LOCK_RWLOCK);
+	if (!sub) {
+		return NULL;
+	}
+	strcpy(sub->token, token); /* Safe */
+
+	return sub;
+}
+
+/*! AO2 hash function for \ref message_subscription */
+static int message_subscription_hash_cb(const void *obj, const int flags)
+{
+	const struct message_subscription *sub;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		sub = obj;
+		key = sub->token;
+		break;
+	default:
+		/* Hash can only work on something with a full key. */
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_hash(key);
+}
+
+/*! AO2 comparison function for \ref message_subscription */
+static int message_subscription_compare_cb(void *obj, void *arg, int flags)
+{
+	const struct message_subscription *object_left = obj;
+	const struct message_subscription *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->token;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcmp(object_left->token, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		/*
+		 * We could also use a partial key struct containing a length
+		 * so strlen() does not get called for every comparison instead.
+		 */
+		cmp = strncmp(object_left->token, right_key, strlen(right_key));
+		break;
+	default:
+		/*
+		 * What arg points to is specific to this traversal callback
+		 * and has no special meaning to astobj2.
+		 */
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	/*
+	 * At this point the traversal callback is identical to a sorted
+	 * container.
+	 */
+	return CMP_MATCH;
+}
+
+/*! \internal \brief Convert a \c ast_msg To/From URI to a Stasis endpoint name */
+static void msg_to_endpoint(const struct ast_msg *msg, char *buf, size_t len)
+{
+	const char *endpoint = ast_msg_get_endpoint(msg);
+
+	snprintf(buf, len, "%s%s%s", ast_msg_get_tech(msg),
+		ast_strlen_zero(endpoint) ? "" : "/",
+		S_OR(endpoint, ""));
+}
+
+/*! \internal
+ * \brief Callback from the \c message API that determines if we can handle
+ * this message
+ */
+static int has_destination_cb(const struct ast_msg *msg)
+{
+	struct message_subscription *sub;
+	int i;
+	char buf[256];
+
+	msg_to_endpoint(msg, buf, sizeof(buf));
+
+	ast_rwlock_rdlock(&tech_subscriptions_lock);
+	for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+		sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+		if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
+		            || !strncasecmp(sub->token, buf, strlen(sub->token)))) {
+			ast_rwlock_unlock(&tech_subscriptions_lock);
+			sub = NULL; /* No ref bump! */
+			goto match;
+		}
+
+	}
+	ast_rwlock_unlock(&tech_subscriptions_lock);
+
+	sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+	if (sub) {
+		goto match;
+	}
+
+	ast_debug(1, "No subscription found for %s\n", buf);
+	return 0;
+
+match:
+	ao2_cleanup(sub);
+	return 1;
+}
+
+static struct ast_json *msg_to_json(struct ast_msg *msg)
+{
+	struct ast_json *json_obj;
+	struct ast_json *json_vars;
+	struct ast_msg_var_iterator *it_vars;
+	const char *name;
+	const char *value;
+
+	it_vars = ast_msg_var_iterator_init(msg);
+	if (!it_vars) {
+		return NULL;
+	}
+
+	json_vars = ast_json_array_create();
+	if (!json_vars) {
+		return NULL;
+	}
+
+	while (ast_msg_var_iterator_next(msg, it_vars, &name, &value)) {
+		struct ast_json *json_tuple;
+
+		json_tuple = ast_json_pack("{s: s}", name, value);
+		if (!json_tuple) {
+			ast_json_free(json_vars);
+			return NULL;
+		}
+
+		ast_json_array_append(json_vars, json_tuple);
+		ast_msg_var_unref_current(it_vars);
+	}
+	ast_msg_var_iterator_destroy(it_vars);
+
+	json_obj = ast_json_pack("{s: s, s: s, s: s, s: o}",
+		"from", ast_msg_get_from(msg),
+		"to", ast_msg_get_to(msg),
+		"body", ast_msg_get_body(msg),
+		"variables", json_vars);
+
+	return json_obj;
+}
+
+static int handle_msg_cb(struct ast_msg *msg)
+{
+	struct message_subscription *sub;
+	int i;
+	char buf[256];
+	const char *endpoint_name;
+	struct ast_json *json_msg;
+
+	msg_to_endpoint(msg, buf, sizeof(buf));
+
+	ast_rwlock_rdlock(&tech_subscriptions_lock);
+	for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+		sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+		if (!sub) {
+			continue;
+		}
+
+		if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
+			ast_rwlock_unlock(&tech_subscriptions_lock);
+			ao2_bump(sub);
+			endpoint_name = buf;
+			goto match;
+		}
+	}
+	ast_rwlock_unlock(&tech_subscriptions_lock);
+
+	sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
+	if (sub) {
+		endpoint_name = buf;
+		goto match;
+	}
+
+	return -1;
+
+match:
+	ast_debug(3, "Dispatching message for %s\n", endpoint_name);
+
+	json_msg = msg_to_json(msg);
+	if (!json_msg) {
+		ao2_ref(sub, -1);
+		return -1;
+	}
+
+	for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+		struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i);
+
+		tuple->callback(endpoint_name, json_msg, tuple->pvt);
+	}
+
+	ast_json_unref(json_msg);
+	ao2_ref(sub, -1);
+	return 0;
+}
+
+struct ast_msg_handler ari_msg_handler = {
+	.name = "ari",
+	.handle_msg = handle_msg_cb,
+	.has_destination = has_destination_cb,
+};
+
+static int messaging_subscription_cmp(struct message_subscription *sub, const char *key)
+{
+	return !strcmp(sub->token, key) ? 1 : 0;
+}
+
+static int application_tuple_cmp(struct application_tuple *item, const char *key)
+{
+	return !strcmp(item->app_name, key) ? 1 : 0;
+}
+
+static int is_app_subscribed(struct message_subscription *sub, const char *app_name)
+{
+	int i;
+
+	for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) {
+		struct application_tuple *tuple;
+
+		tuple = AST_VECTOR_GET(&sub->applications, i);
+		if (tuple && !strcmp(tuple->app_name, app_name)) {
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
+static struct message_subscription *get_subscription(struct ast_endpoint *endpoint)
+{
+	struct message_subscription *sub = NULL;
+
+	if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+		sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
+	} else {
+		int i;
+
+		ast_rwlock_rdlock(&tech_subscriptions_lock);
+		for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
+			sub = AST_VECTOR_GET(&tech_subscriptions, i);
+
+			if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
+				ao2_bump(sub);
+				break;
+			}
+		}
+		ast_rwlock_unlock(&tech_subscriptions_lock);
+	}
+
+	return sub;
+}
+
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id)
+{
+	RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+	RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
+
+	endpoint = ast_endpoint_find_by_id(endpoint_id);
+	if (!endpoint) {
+		return;
+	}
+
+	sub = get_subscription(endpoint);
+	if (!sub) {
+		return;
+	}
+
+	ao2_lock(sub);
+	if (!is_app_subscribed(sub, app_name)) {
+		ao2_unlock(sub);
+		return;
+	}
+
+	AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
+	if (AST_VECTOR_SIZE(&sub->applications) == 0) {
+		if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+			ao2_unlink(endpoint_subscriptions, sub);
+		} else {
+			ast_rwlock_wrlock(&tech_subscriptions_lock);
+			AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
+				messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
+			ast_rwlock_unlock(&tech_subscriptions_lock);
+		}
+	}
+	ao2_unlock(sub);
+	ao2_ref(sub, -1);
+
+	ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+	ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
+		app_name, ast_endpoint_get_id(endpoint));
+}
+
+static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
+{
+	struct message_subscription *sub = get_subscription(endpoint);
+
+	if (sub) {
+		return sub;
+	}
+
+	sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
+	if (!sub) {
+		return NULL;
+	}
+
+	if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
+		ao2_link(endpoint_subscriptions, sub);
+	} else {
+		ast_rwlock_wrlock(&tech_subscriptions_lock);
+		AST_VECTOR_APPEND(&tech_subscriptions, ao2_bump(sub));
+		ast_rwlock_unlock(&tech_subscriptions_lock);
+	}
+
+	return sub;
+}
+
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt)
+{
+	RAII_VAR(struct message_subscription *, sub, NULL, ao2_cleanup);
+	struct application_tuple *tuple;
+
+	sub = get_or_create_subscription(endpoint);
+	if (!sub) {
+		return -1;
+	}
+
+	ao2_lock(sub);
+	if (is_app_subscribed(sub, app_name)) {
+		ao2_unlock(sub);
+		return 0;
+	}
+
+	tuple = application_tuple_alloc(app_name, callback, pvt);
+	if (!tuple) {
+		ao2_unlock(sub);
+		return -1;
+	}
+	AST_VECTOR_APPEND(&sub->applications, tuple);
+	ao2_unlock(sub);
+
+	ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
+	ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
+		app_name, ast_endpoint_get_id(endpoint));
+
+	return 0;
+}
+
+
+int messaging_cleanup(void)
+{
+	ast_msg_handler_unregister(&ari_msg_handler);
+	ao2_ref(endpoint_subscriptions, -1);
+	AST_VECTOR_FREE(&tech_subscriptions);
+	ast_rwlock_destroy(&tech_subscriptions_lock);\
+
+	return 0;
+}
+
+int messaging_init(void)
+{
+	endpoint_subscriptions = ao2_t_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
+		ENDPOINTS_NUM_BUCKETS, message_subscription_hash_cb, NULL,
+		message_subscription_compare_cb, "Endpoint messaging subscription container creation");
+	if (!endpoint_subscriptions) {
+		return -1;
+	}
+
+	if (AST_VECTOR_INIT(&tech_subscriptions, 4)) {
+		ao2_ref(endpoint_subscriptions, -1);
+		return -1;
+	}
+
+	if (ast_rwlock_init(&tech_subscriptions_lock)) {
+		ao2_ref(endpoint_subscriptions, -1);
+		AST_VECTOR_FREE(&tech_subscriptions);
+		return -1;
+	}
+
+	if (ast_msg_handler_register(&ari_msg_handler)) {
+		ao2_ref(endpoint_subscriptions, -1);
+		AST_VECTOR_FREE(&tech_subscriptions);
+		ast_rwlock_destroy(&tech_subscriptions_lock);
+		return -1;
+	}
+
+	return 0;
+}

Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/mjordan/12-messaging/res/stasis/messaging.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/mjordan/12-messaging/res/stasis/messaging.h
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/res/stasis/messaging.h?view=auto&rev=418447
==============================================================================
--- team/mjordan/12-messaging/res/stasis/messaging.h (added)
+++ team/mjordan/12-messaging/res/stasis/messaging.h Sat Jul 12 22:21:01 2014
@@ -1,0 +1,83 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2014, Digium, Inc.
+ *
+ * Matt Jordan <mjordan at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_RES_STASIS_MESSAGING_H
+#define _ASTERISK_RES_STASIS_MESSAGING_H
+
+/*!
+ * \file
+ *
+ * \brief Stasis out-of-call text message support
+ *
+ * \author Matt Jordan <mjordan at digium.com>
+ * \since 12.4.0
+ */
+
+/*!
+ * \brief Callback handler for when a message is received from the core
+ *
+ * \param endpoint_id The ID of the endpoint that we received the message from
+ * \param json_msg JSON representation of the text message
+ * \param pvt ao2 ref counted pvt passed during registration
+ *
+ * \retval 0 the message was handled
+ * \retval non-zero the message was not handled
+ */
+typedef int (* message_received_cb)(const char *endpoint_id, struct ast_json *json_msg, void *pvt);
+
+/*!
+ * \brief Subscribe for messages from a particular endpoint
+ *
+ * \param app_name Name of the stasis application to unsubscribe from messaging
+ * \param endpoint_id The ID of the endpoint we no longer care about
+ *
+ * \retval 0 success
+ * \retval -1 error
+ */
+void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoint_id);
+
+/*!
+ * \brief Subscribe an application to an endpoint for messages
+ *
+ * \param app_name The name of the \ref stasis application to subscribe to \c endpoint
+ * \param endpoint The endpoint object to subscribe to
+ * \param message_received_cb The callback to call when a message is received
+ * \param pvt An ao2 ref counted object that will be passed to the callback.
+ *
+ * \retval 0 subscription was successful
+ * \retval -1 subscription failed
+ */
+int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *endpoint, message_received_cb callback, void *pvt);
+
+/*!
+ * \brief Tidy up the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_cleanup(void);
+
+/*!
+ * \brief Initialize the messaging layer
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int messaging_init(void);
+
+#endif /* #define _ASTERISK_RES_STASIS_MESSAGING_H  */

Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/mjordan/12-messaging/res/stasis/messaging.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/mjordan/12-messaging/tests/test_message.c
URL: http://svnview.digium.com/svn/asterisk/team/mjordan/12-messaging/tests/test_message.c?view=diff&rev=418447&r1=418446&r2=418447
==============================================================================
--- team/mjordan/12-messaging/tests/test_message.c (original)
+++ team/mjordan/12-messaging/tests/test_message.c Sat Jul 12 22:21:01 2014
@@ -728,7 +728,7 @@
 	ast_msg_set_context(msg, NULL);
 	ast_test_validate(test, ast_msg_has_destination(msg) == 1);
 
-	ast_msg_set_to(msg, "__I_SHOULD_NOT_EXIST_PLZ__");	
+	ast_msg_set_to(msg, "__I_SHOULD_NOT_EXIST_PLZ__");
 	ast_test_validate(test, ast_msg_has_destination(msg) == 0);
 
 	result = ast_msg_handler_unregister(&test_msg_handler);




More information about the svn-commits mailing list