[svn-commits] dlee: branch dlee/stasis-forward-optimization r399662 - /team/dlee/stasis-for...
    SVN commits to the Digium repositories 
    svn-commits at lists.digium.com
       
    Tue Sep 24 11:09:47 CDT 2013
    
    
  
Author: dlee
Date: Tue Sep 24 11:09:45 2013
New Revision: 399662
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399662
Log:
Attempt at fixing unsubscribe delivery
Modified:
    team/dlee/stasis-forward-optimization/main/stasis.c
Modified: team/dlee/stasis-forward-optimization/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis.c?view=diff&rev=399662&r1=399661&r2=399662
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis.c Tue Sep 24 11:09:45 2013
@@ -270,7 +270,8 @@
 	}
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 struct stasis_subscription *internal_stasis_subscribe(
 	struct stasis_topic *topic,
@@ -312,7 +313,7 @@
 	if (topic_add_subscription(topic, sub) != 0) {
 		return NULL;
 	}
-	send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+	send_subscription_subscribe(topic, sub);
 
 	ao2_ref(sub, +1);
 	return sub;
@@ -337,18 +338,16 @@
 		return NULL;
 	}
 
-	/* While we don't know if the removal will be successful, we have to
-	 * send the Unsubscribe first, so sub will get its final message.
-	 * Since failure is a serious coding problem that shouldn't happen,
-	 * shouldn't be a big deal.
-	 */
-	send_subscription_change_message(topic,	sub->uniqueid, "Unsubscribe");
-
+	/* We have to remove the subscription first, to ensure the unsubscribe
+	 * is the final message */
 	if (topic_remove_subscription(sub->topic, sub) != 0) {
 		ast_log(LOG_ERROR,
 			"Internal error: subscription has invalid topic\n");
 		return NULL;
 	}
+
+	/* Now let everyone know about the unsubscribe */
+	send_subscription_unsubscribe(topic, sub);
 
 	/* Unsubscribing unrefs the subscription */
 	ao2_cleanup(sub);
@@ -538,6 +537,30 @@
 	return 0;
 }
 
+static void dispatch_message(struct stasis_subscription *sub,
+	struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+	if (sub->mailbox) {
+		struct dispatch *dispatch;
+
+		dispatch = dispatch_create(publisher_topic, message, sub);
+		if (!dispatch) {
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			return;
+		}
+
+		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+			/* Push failed; just delete the dispatch.
+			 */
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			dispatch_dtor(dispatch);
+		}
+	} else {
+		/* Dispatch directly */
+		subscription_invoke(sub, publisher_topic, message);
+	}
+}
+
 void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
 {
 	size_t i;
@@ -556,25 +579,8 @@
 
 		ast_assert(sub != NULL);
 
-		if (sub->mailbox) {
-			struct dispatch *dispatch;
-
-			dispatch = dispatch_create(publisher_topic, message, sub);
-			if (!dispatch) {
-				ast_log(LOG_DEBUG, "Dropping dispatch\n");
-				break;
-			}
-
-			if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
-				/* Push failed; just delete the dispatch.
-				 */
-				ast_log(LOG_DEBUG, "Dropping dispatch\n");
-				dispatch_dtor(dispatch);
-			}
-		} else {
-			/* Dispatch directly */
-			subscription_invoke(sub, publisher_topic, message);
-		}
+		dispatch_message(sub, publisher_topic, message);
+
 	}
 }
 
@@ -668,7 +674,7 @@
 	ao2_cleanup(change->topic);
 }
 
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
 {
 	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
 
@@ -686,12 +692,15 @@
 	return change;
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
 	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-	change = subscription_change_alloc(topic, uniqueid, description);
+	/* This assumes that we have already unsubscribed */
+	ast_assert(stasis_subscription_is_subscribed(sub));
+
+	change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
 
 	if (!change) {
 		return;
@@ -704,6 +713,33 @@
 	}
 
 	stasis_publish(topic, msg);
+}
+
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+	struct stasis_subscription *sub)
+{
+	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+	/* This assumes that we have already unsubscribed */
+	ast_assert(!stasis_subscription_is_subscribed(sub));
+
+	change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+	if (!change) {
+		return;
+	}
+
+	msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+	if (!msg) {
+		return;
+	}
+
+	stasis_publish(topic, msg);
+
+	/* Now we have to dispatch to the subscription itself */
+	dispatch_message(sub, topic, msg);
 }
 
 struct topic_pool_entry {
    
    
More information about the svn-commits
mailing list