[svn-commits] mjordan: trunk r405313 - in /trunk: ./ include/asterisk/ main/ tests/

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sun Jan 12 16:07:03 CST 2014


Author: mjordan
Date: Sun Jan 12 16:07:01 2014
New Revision: 405313

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=405313
Log:
stasis: Add methods to allow for synchronous publishing to subscriber

This patch adds an API call to Stasis that allows a publisher to publish a
stasis message that will not return until a specific subscriber handles the
message. Since a subscriber can have their own forwarding topic which orders
messages from many topics, this allows a publisher who knows of that subscriber
to synchronize to that subscriber regardless of the forwarding relationships
between topics.

This is of particular use for dialplan applications that need to synchronize
on a particular subscriber's handling of a message.

(issue ASTERISK-22884)
Reported by: Matt Jordan

Review: https://reviewboard.asterisk.org/r/3099/
........

Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12

Modified:
    trunk/   (props changed)
    trunk/include/asterisk/stasis.h
    trunk/include/asterisk/stasis_message_router.h
    trunk/main/stasis.c
    trunk/main/stasis_message_router.c
    trunk/tests/test_stasis.c

Propchange: trunk/
------------------------------------------------------------------------------
Binary property 'branch-12-merged' - no diff available.

Modified: trunk/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis.h?view=diff&rev=405313&r1=405312&r2=405313
==============================================================================
--- trunk/include/asterisk/stasis.h (original)
+++ trunk/include/asterisk/stasis.h Sun Jan 12 16:07:01 2014
@@ -185,6 +185,12 @@
  * \since 12
  */
 struct stasis_message;
+
+/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
 
 /*!
  * \brief Structure containing callbacks for Stasis message sanitization
@@ -377,19 +383,34 @@
  * \brief Publish a message to a topic's subscribers.
  * \param topic Topic.
  * \param message Message to publish.
+ *
+ * This call is asynchronous and will return immediately upon queueing
+ * the message for delivery to the topic's subscribers.
+ *
  * \since 12
  */
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
 
+/*!
+ * \brief Publish a message to a topic's subscribers, synchronizing
+ * on the specified subscriber
+ * \param sub Subscription to synchronize on.
+ * \param message Message to publish.
+ *
+ * The caller of stasis_publish_sync will block until the specified
+ * subscriber completes handling of the message.
+ *
+ * All other subscribers to the topic the \ref stasis_subpscription
+ * is subscribed to are also delivered the message; this delivery however
+ * happens asynchronously.
+ *
+ * \since 12.1.0
+ */
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
+
 /*! @} */
 
 /*! @{ */
-
-/*!
- * \brief Opaque type for a Stasis subscription.
- * \since 12
- */
-struct stasis_subscription;
 
 /*!
  * \brief Callback function type for Stasis subscriptions.

Modified: trunk/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_message_router.h?view=diff&rev=405313&r1=405312&r2=405313
==============================================================================
--- trunk/include/asterisk/stasis_message_router.h (original)
+++ trunk/include/asterisk/stasis_message_router.h Sun Jan 12 16:07:01 2014
@@ -93,6 +93,24 @@
 int stasis_message_router_is_done(struct stasis_message_router *router);
 
 /*!
+ * \brief Publish a message to a message router's subscription synchronously
+ *
+ * \param router Router
+ * \param message The \ref stasis message
+ *
+ * This should be used when a message needs to be published synchronously to
+ * the underlying subscription created by a message router. This is analagous
+ * to \ref stasis_publish_sync.
+ *
+ * Note that the caller will be blocked until the thread servicing the message
+ * on the message router's subscription completes handling of the message.
+ *
+ * \since 12.1.0
+ */
+void stasis_message_router_publish_sync(struct stasis_message_router *router,
+	struct stasis_message *message);
+
+/*!
  * \brief Add a route to a message router.
  *
  * A particular \a message_type may have at most one route per \a router. If

Modified: trunk/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis.c?view=diff&rev=405313&r1=405312&r2=405313
==============================================================================
--- trunk/main/stasis.c (original)
+++ trunk/main/stasis.c Sun Jan 12 16:07:01 2014
@@ -505,11 +505,11 @@
 }
 
 /*!
- * \brief Dispatch a message to a subscriber
- * \param data \ref dispatch object
+ * \internal \brief Dispatch a message to a subscriber asynchronously
+ * \param local \ref ast_taskprocessor_local object
  * \return 0
  */
-static int dispatch_exec(struct ast_taskprocessor_local *local)
+static int dispatch_exec_async(struct ast_taskprocessor_local *local)
 {
 	struct stasis_subscription *sub = local->local_data;
 	struct stasis_message *message = local->data;
@@ -520,23 +520,105 @@
 	return 0;
 }
 
+/*!
+ * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
+ * a published message to a subscriber
+ */
+struct sync_task_data {
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int complete;
+	void *task_data;
+};
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber synchronously
+ * \param local \ref ast_taskprocessor_local object
+ * \return 0
+ */
+static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
+{
+	struct stasis_subscription *sub = local->local_data;
+	struct sync_task_data *std = local->data;
+	struct stasis_message *message = std->task_data;
+
+	subscription_invoke(sub, message);
+	ao2_cleanup(message);
+
+	ast_mutex_lock(&std->lock);
+	std->complete = 1;
+	ast_cond_signal(&std->cond);
+	ast_mutex_unlock(&std->lock);
+
+	return 0;
+}
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber
+ * \param sub The subscriber to dispatch to
+ * \param message The message to send
+ * \param synchronous If non-zero, synchronize on the subscriber receiving
+ * the message
+ */
 static void dispatch_message(struct stasis_subscription *sub,
-	struct stasis_message *message)
-{
-	if (sub->mailbox) {
-		ao2_bump(message);
-		if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+	struct stasis_message *message,
+	int synchronous)
+{
+	if (!sub->mailbox) {
+		/* Dispatch directly */
+		subscription_invoke(sub, message);
+		return;
+	}
+
+	/* Bump the message for the taskprocessor push. This will get de-ref'd
+	 * by the task processor callback.
+	 */
+	ao2_bump(message);
+	if (!synchronous) {
+		if (ast_taskprocessor_push_local(sub->mailbox,
+			                             dispatch_exec_async,
+			                             message) != 0) {
 			/* Push failed; ugh. */
-			ast_log(LOG_ERROR, "Dropping dispatch\n");
+			ast_log(LOG_ERROR, "Dropping async dispatch\n");
 			ao2_cleanup(message);
 		}
 	} else {
-		/* Dispatch directly */
-		subscription_invoke(sub, message);
-	}
-}
-
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+		struct sync_task_data std;
+
+		ast_mutex_init(&std.lock);
+		ast_cond_init(&std.cond, NULL);
+		std.complete = 0;
+		std.task_data = message;
+
+		if (ast_taskprocessor_push_local(sub->mailbox,
+			                             dispatch_exec_sync,
+			                             &std)) {
+			/* Push failed; ugh. */
+			ast_log(LOG_ERROR, "Dropping sync dispatch\n");
+			ao2_cleanup(message);
+			return;
+		}
+
+		ast_mutex_lock(&std.lock);
+		while (!std.complete) {
+			ast_cond_wait(&std.cond, &std.lock);
+		}
+		ast_mutex_unlock(&std.lock);
+
+		ast_mutex_destroy(&std.lock);
+		ast_cond_destroy(&std.cond);
+	}
+}
+
+/*!
+ * \internal \brief Publish a message to a topic's subscribers
+ * \brief topic The topic to publish to
+ * \brief message The message to publish
+ * \brief sync_sub An optional subscriber of the topic to publish synchronously
+ * to
+ */
+static void publish_msg(struct stasis_topic *topic,
+	struct stasis_message *message, struct stasis_subscription *sync_sub)
 {
 	size_t i;
 
@@ -554,10 +636,22 @@
 
 		ast_assert(sub != NULL);
 
-		dispatch_message(sub, message);
+		dispatch_message(sub, message, (sub == sync_sub));
 	}
 	ao2_unlock(topic);
 	ao2_ref(topic, -1);
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+	publish_msg(topic, message, NULL);
+}
+
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
+{
+	ast_assert(sub != NULL);
+
+	publish_msg(sub->topic, message, sub);
 }
 
 /*!
@@ -721,7 +815,7 @@
 	stasis_publish(topic, msg);
 
 	/* Now we have to dispatch to the subscription itself */
-	dispatch_message(sub, msg);
+	dispatch_message(sub, msg, 0);
 }
 
 struct topic_pool_entry {

Modified: trunk/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis_message_router.c?view=diff&rev=405313&r1=405312&r2=405313
==============================================================================
--- trunk/main/stasis_message_router.c (original)
+++ trunk/main/stasis_message_router.c Sun Jan 12 16:07:01 2014
@@ -261,6 +261,16 @@
 	return stasis_subscription_is_done(router->subscription);
 }
 
+void stasis_message_router_publish_sync(struct stasis_message_router *router,
+	struct stasis_message *message)
+{
+	ast_assert(router != NULL);
+
+	ao2_bump(router);
+	stasis_publish_sync(router->subscription, message);
+	ao2_cleanup(router);
+}
+
 int stasis_message_router_add(struct stasis_message_router *router,
 	struct stasis_message_type *message_type,
 	stasis_subscription_cb callback, void *data)

Modified: trunk/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/tests/test_stasis.c?view=diff&rev=405313&r1=405312&r2=405313
==============================================================================
--- trunk/tests/test_stasis.c (original)
+++ trunk/tests/test_stasis.c Sun Jan 12 16:07:01 2014
@@ -206,6 +206,27 @@
 	ast_cond_signal(&consumer->out);
 }
 
+static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+	struct consumer *consumer = data;
+	RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+	SCOPED_MUTEX(lock, &consumer->lock);
+
+	if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
+
+		++consumer->messages_rxed_len;
+		consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+		ast_assert(consumer->messages_rxed != NULL);
+		consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+		ao2_ref(message, +1);
+	}
+
+	if (stasis_subscription_final_message(sub, message)) {
+		consumer->complete = 1;
+		consumer_needs_cleanup = consumer;
+	}
+}
+
 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
 {
 	struct timeval start = ast_tvnow();
@@ -341,8 +362,8 @@
 	case TEST_INIT:
 		info->name = __func__;
 		info->category = test_category;
-		info->summary = "Test simple subscriptions";
-		info->description = "Test simple subscriptions";
+		info->summary = "Test publishing";
+		info->description = "Test publishing";
 		return AST_TEST_NOT_RUN;
 	case TEST_EXECUTE:
 		break;
@@ -366,6 +387,53 @@
 	stasis_publish(topic, test_message);
 
 	actual_len = consumer_wait_for(consumer, 1);
+	ast_test_validate(test, 1 == actual_len);
+	actual = stasis_message_data(consumer->messages_rxed[0]);
+	ast_test_validate(test, test_data == actual);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(publish_sync)
+{
+	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+	int actual_len;
+	const char *actual;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test synchronous publishing";
+		info->description = "Test synchronous publishing";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	topic = stasis_topic_create("TestTopic");
+	ast_test_validate(test, NULL != topic);
+
+	consumer = consumer_create(1);
+	ast_test_validate(test, NULL != consumer);
+
+	uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
+	ast_test_validate(test, NULL != uut);
+	ao2_ref(consumer, +1);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+	test_message_type = stasis_message_type_create("TestMessage", NULL);
+	test_message = stasis_message_create(test_message_type, test_data);
+
+	stasis_publish_sync(uut, test_message);
+
+	actual_len = consumer->messages_rxed_len;
 	ast_test_validate(test, 1 == actual_len);
 	actual = stasis_message_data(consumer->messages_rxed[0]);
 	ast_test_validate(test, test_data == actual);
@@ -1324,6 +1392,7 @@
 	AST_TEST_UNREGISTER(message);
 	AST_TEST_UNREGISTER(subscription_messages);
 	AST_TEST_UNREGISTER(publish);
+	AST_TEST_UNREGISTER(publish_sync);
 	AST_TEST_UNREGISTER(unsubscribe_stops_messages);
 	AST_TEST_UNREGISTER(forward);
 	AST_TEST_UNREGISTER(cache_filter);
@@ -1347,6 +1416,7 @@
 	AST_TEST_REGISTER(message);
 	AST_TEST_REGISTER(subscription_messages);
 	AST_TEST_REGISTER(publish);
+	AST_TEST_REGISTER(publish_sync);
 	AST_TEST_REGISTER(unsubscribe_stops_messages);
 	AST_TEST_REGISTER(forward);
 	AST_TEST_REGISTER(cache_filter);




More information about the svn-commits mailing list