[asterisk-commits] kmoore: trunk r383422 - /trunk/main/stasis.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 20 11:01:32 CDT 2013


Author: kmoore
Date: Wed Mar 20 11:01:30 2013
New Revision: 383422

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383422
Log:
Resolve a race condition in Stasis

Because of the way that topics were handled when publishing, it was
possible to dispatch a message to a subscription after that
subscription had been unsubscribed such that the dispatched message
arrived at the callback after the callback had received its final
message. In callbacks that cleaned up user data, this would often cause
a segfault. This has been resolved by locking the topic during the
entirety of dispatch. To prevent long publishing and topic locking
times, forwarding subscriptions have been made to be standard
subscriptions instead of mailboxless subscriptions which were
dispatched at publishing time.

Modified:
    trunk/main/stasis.c

Modified: trunk/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/stasis.c?view=diff&rev=383422&r1=383421&r2=383422
==============================================================================
--- trunk/main/stasis.c (original)
+++ trunk/main/stasis.c Wed Mar 20 11:01:30 2013
@@ -131,7 +131,7 @@
 
 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
 
-static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
+struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
 {
 	RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 	RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
@@ -148,11 +148,10 @@
 		return NULL;
 	}
 	ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
-	if (needs_mailbox) {
-		sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
-		if (!sub->mailbox) {
-			return NULL;
-		}
+
+	sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
+	if (!sub->mailbox) {
+		return NULL;
 	}
 
 	sub->uniqueid = ast_strdup(uniqueid);
@@ -168,11 +167,6 @@
 
 	ao2_ref(sub, +1);
 	return sub;
-}
-
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
-{
-	return __stasis_subscribe(topic, callback, data, 1);
 }
 
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
@@ -338,58 +332,29 @@
 
 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
 {
-	struct stasis_subscription **subscribers = NULL;
-	size_t num_subscribers, i;
+	size_t i;
+	SCOPED_AO2LOCK(lock, topic);
 
 	ast_assert(topic != NULL);
 	ast_assert(publisher_topic != NULL);
 	ast_assert(message != NULL);
 
-	/* Copy the subscribers, so we don't have to hold the mutex for long */
-	{
-		SCOPED_AO2LOCK(lock, topic);
-		num_subscribers = topic->num_subscribers_current;
-		subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
-		if (subscribers) {
-			for (i = 0; i < num_subscribers; ++i) {
-				ao2_ref(topic->subscribers[i], +1);
-				subscribers[i] = topic->subscribers[i];
-			}
+	for (i = 0; i < topic->num_subscribers_current; ++i) {
+		struct stasis_subscription *sub = topic->subscribers[i];
+		RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+		ast_assert(sub != NULL);
+
+		dispatch = dispatch_create(publisher_topic, message, sub);
+		if (!dispatch) {
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			break;
 		}
-	}
-
-	if (!subscribers) {
-		ast_log(LOG_ERROR, "Dropping message\n");
-		return;
-	}
-
-	for (i = 0; i < num_subscribers; ++i) {
-		struct stasis_subscription *sub = subscribers[i];
-
-		ast_assert(sub != NULL);
-
-		if (sub->mailbox) {
-			RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
-
-			dispatch = dispatch_create(publisher_topic, message, sub);
-			if (!dispatch) {
-				ast_log(LOG_DEBUG, "Dropping dispatch\n");
-				break;
-			}
-
-			if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
-				dispatch = NULL; /* Ownership transferred to mailbox */
-			}
-		} else {
-			/* No mailbox; dispatch directly */
-			sub->callback(sub->data, sub, sub->topic, message);
+
+		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+			dispatch = NULL; /* Ownership transferred to mailbox */
 		}
 	}
-
-	for (i = 0; i < num_subscribers; ++i) {
-		ao2_cleanup(subscribers[i]);
-	}
-	ast_free(subscribers);
 }
 
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
@@ -414,8 +379,8 @@
 	if (!from_topic || !to_topic) {
 		return NULL;
 	}
-	/* Subscribe without a mailbox, since we're just forwarding messages */
-	sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
+
+	sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
 	if (sub) {
 		/* hold a ref to to_topic for this forwarding subscription */
 		ao2_ref(to_topic, +1);




More information about the asterisk-commits mailing list