[asterisk-commits] dlee: branch dlee/tp-local r399779 - /team/dlee/tp-local/main/stasis.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Sep 25 13:09:40 CDT 2013


Author: dlee
Date: Wed Sep 25 13:09:38 2013
New Revision: 399779

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399779
Log:
Stasis using tp-local

Modified:
    team/dlee/tp-local/main/stasis.c

Modified: team/dlee/tp-local/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/main/stasis.c?view=diff&rev=399779&r1=399778&r2=399779
==============================================================================
--- team/dlee/tp-local/main/stasis.c (original)
+++ team/dlee/tp-local/main/stasis.c Wed Sep 25 13:09:38 2013
@@ -301,6 +301,9 @@
 		if (!sub->mailbox) {
 			return NULL;
 		}
+		ast_taskprocessor_set_local(sub->mailbox, sub);
+		/* Taskprocessor has a reference */
+		ao2_ref(sub, +1);
 	}
 
 	ao2_ref(topic, +1);
@@ -327,6 +330,13 @@
 	return internal_stasis_subscribe(topic, callback, data, 1);
 }
 
+static int sub_cleanup(void *data)
+{
+	struct stasis_subscription *sub = data;
+	ao2_cleanup(sub);
+	return 0;
+}
+
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
 	/* The subscription may be the last ref to this topic. Hold
@@ -348,6 +358,11 @@
 
 	/* Now let everyone know about the unsubscribe */
 	send_subscription_unsubscribe(topic, sub);
+
+	/* When all that's done, remove the ref the mailbox has on the sub */
+	if (sub->mailbox) {
+		ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+	}
 
 	/* Unsubscribing unrefs the subscription */
 	ao2_cleanup(sub);
@@ -484,28 +499,24 @@
 	struct stasis_topic *topic;
 	/*! The message itself */
 	struct stasis_message *message;
-	/*! Subscription receiving the message */
-	struct stasis_subscription *sub;
 };
 
-static void dispatch_dtor(struct dispatch *dispatch)
-{
+static void dispatch_dtor(void *obj)
+{
+	struct dispatch *dispatch = obj;
+
 	ao2_cleanup(dispatch->topic);
 	ao2_cleanup(dispatch->message);
-	ao2_cleanup(dispatch->sub);
-
-	ast_free(dispatch);
-}
-
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message)
 {
 	struct dispatch *dispatch;
 
 	ast_assert(topic != NULL);
 	ast_assert(message != NULL);
-	ast_assert(sub != NULL);
-
-	dispatch = ast_malloc(sizeof(*dispatch));
+
+	dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
 	if (!dispatch) {
 		return NULL;
 	}
@@ -515,9 +526,6 @@
 
 	dispatch->message = message;
 	ao2_ref(message, +1);
-
-	dispatch->sub = sub;
-	ao2_ref(sub, +1);
 
 	return dispatch;
 }
@@ -527,34 +535,45 @@
  * \param data \ref dispatch object
  * \return 0
  */
-static int dispatch_exec(void *data)
-{
-	struct dispatch *dispatch = data;
-
-	subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
-	dispatch_dtor(dispatch);
+static int dispatch_exec(struct ast_taskprocessor_local *local)
+{
+	struct stasis_subscription *sub = local->local_data;
+	struct dispatch *dispatch = local->data;
+
+	subscription_invoke(sub, dispatch->topic, dispatch->message);
+	ao2_cleanup(dispatch);
 
 	return 0;
 }
 
 static void dispatch_message(struct stasis_subscription *sub,
+	struct dispatch *dispatch)
+{
+	if (sub->mailbox) {
+		ao2_bump(dispatch);
+		if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, dispatch) != 0) {
+			/* Push failed; ugh. */
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			ao2_cleanup(dispatch);
+		}
+	} else {
+		/* Dispatch directly */
+		subscription_invoke(sub, dispatch->topic, dispatch->message);
+	}
+}
+
+static void dispatch_single_message(struct stasis_subscription *sub,
 	struct stasis_topic *publisher_topic, struct stasis_message *message)
 {
 	if (sub->mailbox) {
-		struct dispatch *dispatch;
-
-		dispatch = dispatch_create(publisher_topic, message, sub);
+		RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+		dispatch = dispatch_create(publisher_topic, message);
 		if (!dispatch) {
 			ast_log(LOG_DEBUG, "Dropping dispatch\n");
 			return;
 		}
 
-		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
-			/* Push failed; just delete the dispatch.
-			 */
-			ast_log(LOG_DEBUG, "Dropping dispatch\n");
-			dispatch_dtor(dispatch);
-		}
+		dispatch_message(sub, dispatch);
 	} else {
 		/* Dispatch directly */
 		subscription_invoke(sub, publisher_topic, message);
@@ -568,18 +587,29 @@
 	 * Make sure we hold onto a reference while dispatching. */
 	RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
 		ao2_cleanup);
+	RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
 	SCOPED_AO2LOCK(lock, topic);
 
 	ast_assert(topic != NULL);
 	ast_assert(publisher_topic != NULL);
 	ast_assert(message != NULL);
 
+	if (ast_vector_size(topic->subscribers) == 0) {
+		return;
+	}
+
+	dispatch = dispatch_create(publisher_topic, message);
+	if (!dispatch) {
+		ast_log(LOG_DEBUG, "Dropping dispatch\n");
+		return;
+	}
+
 	for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
 		struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
 
 		ast_assert(sub != NULL);
 
-		dispatch_message(sub, publisher_topic, message);
+		dispatch_message(sub, dispatch);
 
 	}
 }
@@ -738,8 +768,7 @@
 
 	stasis_publish(topic, msg);
 
-	/* Now we have to dispatch to the subscription itself */
-	dispatch_message(sub, topic, msg);
+	dispatch_single_message(sub, topic, msg);
 }
 
 struct topic_pool_entry {




More information about the asterisk-commits mailing list