[asterisk-commits] dlee: branch dlee/stasis-interleaving r384387 - in /team/dlee/stasis-interlea...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 29 23:40:36 CDT 2013


Author: dlee
Date: Fri Mar 29 23:40:33 2013
New Revision: 384387

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=384387
Log:
Fixed message ordering issues when forwarding

Modified:
    team/dlee/stasis-interleaving/main/stasis.c
    team/dlee/stasis-interleaving/tests/test_stasis.c

Modified: team/dlee/stasis-interleaving/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-interleaving/main/stasis.c?view=diff&rev=384387&r1=384386&r2=384387
==============================================================================
--- team/dlee/stasis-interleaving/main/stasis.c (original)
+++ team/dlee/stasis-interleaving/main/stasis.c Fri Mar 29 23:40:33 2013
@@ -61,7 +61,6 @@
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-struct stasis_subscription;
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 static void topic_dtor(void *obj)
@@ -127,9 +126,30 @@
 	sub->mailbox = NULL;
 }
 
+/*!
+ * \brief Invoke the subscription's callback.
+ * \param sub Subscription to invoke.
+ * \param topic Topic message was published to.
+ * \param message Message to send.
+ */
+static void subscription_invoke(struct stasis_subscription *sub,
+				  struct stasis_topic *topic,
+				  struct stasis_message *message)
+{
+	/* Since sub->topic doesn't change, no need to lock sub */
+	sub->callback(sub->data,
+		      sub,
+		      topic,
+		      message);
+}
+
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
 
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+static struct stasis_subscription *__stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	int needs_mailbox)
 {
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
@@ -140,9 +160,11 @@
 
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
-	sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
-	if (!sub->mailbox) {
-		return NULL;
+	if (needs_mailbox) {
+		sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+		if (!sub->mailbox) {
+			return NULL;
+		}
 	}
 
 	ao2_ref(topic, +1);
@@ -157,6 +179,14 @@
 
 	ao2_ref(sub, +1);
 	return sub;
+}
+
+struct stasis_subscription *stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data)
+{
+	return __stasis_subscribe(topic, callback, data, 1);
 }
 
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
@@ -305,17 +335,8 @@
 static int dispatch_exec(void *data)
 {
 	RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
-	RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
-
-	/* Since sub->topic doesn't change, no need to lock sub */
-	ast_assert(dispatch->sub->topic != NULL);
-	ao2_ref(dispatch->sub->topic, +1);
-	sub_topic = dispatch->sub->topic;
-
-	dispatch->sub->callback(dispatch->sub->data,
-				dispatch->sub,
-				sub_topic,
-				dispatch->message);
+
+	subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
 
 	return 0;
 }
@@ -331,18 +352,28 @@
 
 	for (i = 0; i < topic->num_subscribers_current; ++i) {
 		struct stasis_subscription *sub = topic->subscribers[i];
-		RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
 
 		ast_assert(sub != NULL);
 
-		dispatch = dispatch_create(publisher_topic, message, sub);
-		if (!dispatch) {
-			ast_log(LOG_DEBUG, "Dropping dispatch\n");
-			break;
-		}
-
-		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-			dispatch = NULL; /* Ownership transferred to mailbox */
+		if (sub->mailbox) {
+			RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+			dispatch = dispatch_create(publisher_topic, message, sub);
+			if (!dispatch) {
+				ast_log(LOG_DEBUG, "Dropping dispatch\n");
+				break;
+			}
+
+			if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+				/* Ownership transferred to mailbox.
+				 * Don't increment ref, b/c the task processor
+				 * may have already gotten rid of the object.
+				 */
+				dispatch = NULL;
+			}
+		} else {
+			/* Dispatch directly */
+			subscription_invoke(sub, publisher_topic, message);
 		}
 	}
 }
@@ -370,7 +401,11 @@
 		return NULL;
 	}
 
-	sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
+	/* Forwarding subscriptions should dispatch directly instead of having a
+	 * mailbox. Otherwise, messages forwarded to the same topic from
+	 * different topics may get reordered. Which is bad.
+	 */
+	sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
 	if (sub) {
 		/* hold a ref to to_topic for this forwarding subscription */
 		ao2_ref(to_topic, +1);

Modified: team/dlee/stasis-interleaving/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-interleaving/tests/test_stasis.c?view=diff&rev=384387&r1=384386&r2=384387
==============================================================================
--- team/dlee/stasis-interleaving/tests/test_stasis.c (original)
+++ team/dlee/stasis-interleaving/tests/test_stasis.c Fri Mar 29 23:40:33 2013
@@ -391,7 +391,6 @@
 	return AST_TEST_PASS;
 }
 
-
 AST_TEST_DEFINE(forward)
 {
 	RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
@@ -454,6 +453,89 @@
 	ast_test_validate(test, 1 == actual_len);
 	actual_len = consumer_wait_for(parent_consumer, 1);
 	ast_test_validate(test, 1 == actual_len);
+
+	return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(interleaving)
+{
+	RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+	RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+	int actual_len;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = __func__;
+		info->category = test_category;
+		info->summary = "Test sending interleaved events to a parent topic";
+		info->description = "Test sending events to a parent topic.\n"
+			"This test creates three topics (one parent, two children)\n"
+			"and publishes messages alternately between the children.\n"
+			"It verifies that the messages are received in the expected\n"
+			"order.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	test_message_type = stasis_message_type_create("test");
+	ast_test_validate(test, NULL != test_message_type);
+
+	test_data = ao2_alloc(1, NULL);
+	ast_test_validate(test, NULL != test_data);
+
+	test_message1 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message1);
+	test_message2 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message2);
+	test_message3 = stasis_message_create(test_message_type, test_data);
+	ast_test_validate(test, NULL != test_message3);
+
+	parent_topic = stasis_topic_create("ParentTestTopic");
+	ast_test_validate(test, NULL != parent_topic);
+	topic1 = stasis_topic_create("Topic1");
+	ast_test_validate(test, NULL != topic1);
+	topic2 = stasis_topic_create("Topic2");
+	ast_test_validate(test, NULL != topic2);
+
+	forward_sub1 = stasis_forward_all(topic1, parent_topic);
+	ast_test_validate(test, NULL != forward_sub1);
+	forward_sub2 = stasis_forward_all(topic2, parent_topic);
+	ast_test_validate(test, NULL != forward_sub2);
+
+	consumer = consumer_create(1);
+	ast_test_validate(test, NULL != consumer);
+
+	sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
+	ast_test_validate(test, NULL != sub);
+	ao2_ref(consumer, +1);
+
+	stasis_publish(topic1, test_message1);
+	stasis_publish(topic2, test_message2);
+	stasis_publish(topic1, test_message3);
+
+	actual_len = consumer_wait_for(consumer, 3);
+	ast_test_validate(test, 3 == actual_len);
+
+	ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
+	ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
+	ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
 
 	return AST_TEST_PASS;
 }
@@ -829,6 +911,7 @@
 	AST_TEST_UNREGISTER(cache);
 	AST_TEST_UNREGISTER(route_conflicts);
 	AST_TEST_UNREGISTER(router);
+	AST_TEST_UNREGISTER(interleaving);
 	return 0;
 }
 
@@ -844,6 +927,7 @@
 	AST_TEST_REGISTER(cache);
 	AST_TEST_REGISTER(route_conflicts);
 	AST_TEST_REGISTER(router);
+	AST_TEST_REGISTER(interleaving);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 




More information about the asterisk-commits mailing list