[asterisk-commits] dlee: trunk r384413 - in /trunk: main/stasis.c tests/test_stasis.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Apr 1 08:37:54 CDT 2013


Author: dlee
Date: Mon Apr  1 08:37:51 2013
New Revision: 384413

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384413
Log:
stasis: Fixed message ordering issues when forwarding

This patch fixes an issue of message ordering that occurs when
multiple topics are forwarded to an aggregator topic (such as
ast_channel_topic_all()).

It is (very reasonably) expected that the rules governing message
dispatch order still apply, so long as the messages start from the
same thread, and are received by the same subscription. Because the
existing code had an additional layer of dispatching via the Stasis
thread pool for forwards, those promises couldn't be kept.

Forwarding subscriptions no longer have their own mailbox, and now
dispatch directly from the forwarding topic's stasis_publish()
call. This means that the topic's lock is held for the duration of not
only a message's dispatch, but the dispatch of all the forwards. This
shouldn't be a problem right now, but if an aggregator topic had many
subscribers, it could become a problem. But I figure we can write more
clever code when the time comes, if necessary.

Review: https://reviewboard.asterisk.org/r/2419/

Modified:
    trunk/main/stasis.c
    trunk/tests/test_stasis.c

Modified: trunk/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis.c?view=diff&rev=384413&r1=384412&r2=384413
==============================================================================
--- trunk/main/stasis.c (original)
+++ trunk/main/stasis.c Mon Apr  1 08:37:51 2013
@@ -61,7 +61,6 @@
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-struct stasis_subscription;
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 static void topic_dtor(void *obj)
@@ -127,9 +126,30 @@
 	sub->mailbox = NULL;
 }
 
+/*!
+ * \brief Invoke the subscription's callback.
+ * \param sub Subscription to invoke.
+ * \param topic Topic message was published to.
+ * \param message Message to send.
+ */
+static void subscription_invoke(struct stasis_subscription *sub,
+				  struct stasis_topic *topic,
+				  struct stasis_message *message)
+{
+	/* Since sub->topic doesn't change, no need to lock sub */
+	sub->callback(sub->data,
+		      sub,
+		      topic,
+		      message);
+}
+
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
 
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+static struct stasis_subscription *__stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	int needs_mailbox)
 {
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
@@ -140,9 +160,11 @@
 
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
-	sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
-	if (!sub->mailbox) {
-		return NULL;
+	if (needs_mailbox) {
+		sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+		if (!sub->mailbox) {
+			return NULL;
+		}
 	}
 
 	ao2_ref(topic, +1);
@@ -157,6 +179,14 @@
 
 	ao2_ref(sub, +1);
 	return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data)
+{
+	return __stasis_subscribe(topic, callback, data, 1);
 }
 
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
@@ -305,17 +335,8 @@
 static int dispatch_exec(void *data)
 {
 	RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
-	RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
-
-	/* Since sub->topic doesn't change, no need to lock sub */
-	ast_assert(dispatch->sub->topic != NULL);
-	ao2_ref(dispatch->sub->topic, +1);
-	sub_topic = dispatch->sub->topic;
-
-	dispatch->sub->callback(dispatch->sub->data,
-				dispatch->sub,
-				sub_topic,
-				dispatch->message);
+
+	subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
 
 	return 0;
 }
@@ -331,18 +352,28 @@
 
 	for (i = 0; i < topic->num_subscribers_current; ++i) {
 		struct stasis_subscription *sub = topic->subscribers[i];
-		RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
 
 		ast_assert(sub != NULL);
 
-		dispatch = dispatch_create(publisher_topic, message, sub);
-		if (!dispatch) {
-			ast_log(LOG_DEBUG, "Dropping dispatch\n");
-			break;
-		}
-
-		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-			dispatch = NULL; /* Ownership transferred to mailbox */
+		if (sub->mailbox) {
+			RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+			dispatch = dispatch_create(publisher_topic, message, sub);
+			if (!dispatch) {
+				ast_log(LOG_DEBUG, "Dropping dispatch\n");
+				break;
+			}
+
+			if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+				/* Ownership transferred to mailbox.
+				 * Don't increment ref, b/c the task processor
+				 * may have already gotten rid of the object.
+				 */
+				dispatch = NULL;
+			}
+		} else {
+			/* Dispatch directly */
+			subscription_invoke(sub, publisher_topic, message);
 		}
 	}
 }
@@ -370,7 +401,11 @@
 		return NULL;
 	}
 
-	sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
+	/* Forwarding subscriptions should dispatch directly instead of having a
+	 * mailbox. Otherwise, messages forwarded to the same topic from
+	 * different topics may get reordered. Which is bad.
+	 */
+	sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
 	if (sub) {
 		/* hold a ref to to_topic for this forwarding subscription */
 		ao2_ref(to_topic, +1);

Modified: trunk/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/tests/test_stasis.c?view=diff&rev=384413&r1=384412&r2=384413
==============================================================================
--- trunk/tests/test_stasis.c (original)
+++ trunk/tests/test_stasis.c Mon Apr  1 08:37:51 2013
@@ -391,7 +391,6 @@
 	return AST_TEST_PASS;
 }
 
-
 AST_TEST_DEFINE(forward)
 {
 	RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
@@ -454,6 +453,89 @@
 	ast_test_validate(test, 1 == actual_len);
 	actual_len = consumer_wait_for(parent_consumer, 1);
 	ast_test_validate(test, 1 == actual_len);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(interleaving)
+{
+	RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+	int actual_len;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test sending interleaved events to a parent topic";
+		info->description = "Test sending events to a parent topic.\n"
+			"This test creates three topics (one parent, two children)\n"
+			"and publishes messages alternately between the children.\n"
+			"It verifies that the messages are received in the expected\n"
+			"order.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	test_message_type = stasis_message_type_create("test");
+	ast_test_validate(test, NULL != test_message_type);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+
+	test_message1 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message1);
+	test_message2 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message2);
+	test_message3 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message3);
+
+	parent_topic = stasis_topic_create("ParentTestTopic");
+	ast_test_validate(test, NULL != parent_topic);
+	topic1 = stasis_topic_create("Topic1");
+	ast_test_validate(test, NULL != topic1);
+	topic2 = stasis_topic_create("Topic2");
+	ast_test_validate(test, NULL != topic2);
+
+	forward_sub1 = stasis_forward_all(topic1, parent_topic);
+	ast_test_validate(test, NULL != forward_sub1);
+	forward_sub2 = stasis_forward_all(topic2, parent_topic);
+	ast_test_validate(test, NULL != forward_sub2);
+
+	consumer = consumer_create(1);
+	ast_test_validate(test, NULL != consumer);
+
+	sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
+	ast_test_validate(test, NULL != sub);
+	ao2_ref(consumer, +1);
+
+	stasis_publish(topic1, test_message1);
+	stasis_publish(topic2, test_message2);
+	stasis_publish(topic1, test_message3);
+
+	actual_len = consumer_wait_for(consumer, 3);
+	ast_test_validate(test, 3 == actual_len);
+
+	ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
+	ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
+	ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
 
 	return AST_TEST_PASS;
 }
@@ -829,6 +911,7 @@
 	AST_TEST_UNREGISTER(cache);
 	AST_TEST_UNREGISTER(route_conflicts);
 	AST_TEST_UNREGISTER(router);
+	AST_TEST_UNREGISTER(interleaving);
 	return 0;
 }
 
@@ -844,6 +927,7 @@
 	AST_TEST_REGISTER(cache);
 	AST_TEST_REGISTER(route_conflicts);
 	AST_TEST_REGISTER(router);
+	AST_TEST_REGISTER(interleaving);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 




More information about the asterisk-commits mailing list