[asterisk-commits] dlee: branch group/performance r399678 - in /team/group/performance: ./ apps/...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Sep 24 13:23:29 CDT 2013


Author: dlee
Date: Tue Sep 24 13:23:25 2013
New Revision: 399678

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399678
Log:
Merged ^/team/dlee/stasis-forward-optimization at 399664

Added:
    team/group/performance/include/asterisk/vector.h
      - copied unchanged from r399667, team/dlee/stasis-forward-optimization/include/asterisk/vector.h
Modified:
    team/group/performance/   (props changed)
    team/group/performance/apps/app_queue.c
    team/group/performance/include/asterisk/stasis.h
    team/group/performance/main/cdr.c
    team/group/performance/main/cel.c
    team/group/performance/main/channel_internal_api.c
    team/group/performance/main/manager.c
    team/group/performance/main/manager_bridges.c
    team/group/performance/main/manager_channels.c
    team/group/performance/main/manager_mwi.c
    team/group/performance/main/manager_system.c
    team/group/performance/main/stasis.c
    team/group/performance/main/stasis_cache_pattern.c
    team/group/performance/res/stasis/app.c
    team/group/performance/tests/test_stasis.c
    team/group/performance/tests/test_stasis_endpoints.c

Propchange: team/group/performance/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Sep 24 13:23:25 2013
@@ -1,1 +1,1 @@
-/team/dlee/performance:1-399659 /team/dlee/taskprocessor-optimization:1-399654
+/team/dlee/performance:1-399659 /team/dlee/taskprocessor-optimization:1-399654 /team/dlee/stasis-forward-optimization:1-399664

Modified: team/group/performance/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/apps/app_queue.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/apps/app_queue.c (original)
+++ team/group/performance/apps/app_queue.c Tue Sep 24 13:23:25 2013
@@ -10327,7 +10327,7 @@
 };
 
 static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static int unload_module(void)
 {
@@ -10355,7 +10355,7 @@
 		stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
 	}
 	stasis_message_router_unsubscribe_and_join(agent_router);
-	topic_forwarder = stasis_unsubscribe(topic_forwarder);
+	topic_forwarder = stasis_forward_cancel(topic_forwarder);
 
 	STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);

Modified: team/group/performance/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/include/asterisk/stasis.h?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/include/asterisk/stasis.h (original)
+++ team/group/performance/include/asterisk/stasis.h Tue Sep 24 13:23:25 2013
@@ -464,6 +464,8 @@
 struct stasis_subscription *stasis_unsubscribe_and_join(
 	struct stasis_subscription *subscription);
 
+struct stasis_forward;
+
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
  * another.
@@ -477,8 +479,10 @@
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
 	struct stasis_topic *to_topic);
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
 
 /*!
  * \brief Get the unique ID for the subscription.

Modified: team/group/performance/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/cdr.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/cdr.c (original)
+++ team/group/performance/main/cdr.c Tue Sep 24 13:23:25 2013
@@ -334,13 +334,13 @@
 static struct stasis_message_router *stasis_router;
 
 /*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
 
 /*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
 
 /*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
 
 /*! \brief The parent topic for all topics we want to aggregate for CDRs */
 static struct stasis_topic *cdr_topic;
@@ -3965,9 +3965,9 @@
 
 static void cdr_engine_cleanup(void)
 {
-	channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
-	bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
-	parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+	channel_subscription = stasis_forward_cancel(channel_subscription);
+	bridge_subscription = stasis_forward_cancel(bridge_subscription);
+	parking_subscription = stasis_forward_cancel(parking_subscription);
 	stasis_message_router_unsubscribe_and_join(stasis_router);
 	ao2_cleanup(cdr_topic);
 	cdr_topic = NULL;

Modified: team/group/performance/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/cel.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/cel.c (original)
+++ team/group/performance/main/cel.c Tue Sep 24 13:23:25 2013
@@ -121,16 +121,16 @@
 static struct stasis_topic *cel_aggregation_topic;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
 
 /*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
 
 /*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
 
 struct stasis_message_type *cel_generic_type(void);
 STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1343,10 +1343,10 @@
 	cel_aggregation_topic = NULL;
 	ao2_cleanup(cel_topic);
 	cel_topic = NULL;
-	cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
-	cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
-	cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
-	cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+	cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+	cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+	cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+	cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
 	ast_cli_unregister(&cli_status);
 	ao2_cleanup(cel_dialstatus_store);
 	cel_dialstatus_store = NULL;

Modified: team/group/performance/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/channel_internal_api.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/channel_internal_api.c (original)
+++ team/group/performance/main/channel_internal_api.c Tue Sep 24 13:23:25 2013
@@ -207,8 +207,7 @@
 	char sending_dtmf_digit;			/*!< Digit this channel is currently sending out. (zero if not sending) */
 	struct timeval sending_dtmf_tv;		/*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
 	struct stasis_cp_single *topics;		/*!< Topic for all channel's events */
-	struct stasis_subscription *forwarder;		/*!< Subscription for event forwarding to all topic */
-	struct stasis_subscription *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
+	struct stasis_forward *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
 };
 
 /*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@
 
 	ast_string_field_free_memory(chan);
 
-	chan->forwarder = stasis_unsubscribe(chan->forwarder);
-	chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+	chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
 
 	stasis_cp_single_unsubscribe(chan->topics);
 	chan->topics = NULL;

Modified: team/group/performance/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager.c (original)
+++ team/group/performance/main/manager.c Tue Sep 24 13:23:25 2013
@@ -1126,7 +1126,7 @@
 static struct stasis_message_router *stasis_router;
 
 /*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
 
 #define MGR_SHOW_TERMINAL_WIDTH 80
 
@@ -7755,7 +7755,7 @@
 		stasis_message_router_unsubscribe_and_join(stasis_router);
 		stasis_router = NULL;
 	}
-	stasis_unsubscribe_and_join(rtp_topic_forwarder);
+	stasis_forward_cancel(rtp_topic_forwarder);
 	rtp_topic_forwarder = NULL;
 	ao2_cleanup(manager_topic);
 	manager_topic = NULL;

Modified: team/group/performance/main/manager_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_bridges.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_bridges.c (original)
+++ team/group/performance/main/manager_bridges.c Tue Sep 24 13:23:25 2013
@@ -106,7 +106,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_bridge_state_string_prefix(
 	const struct ast_bridge_snapshot *snapshot,
@@ -456,7 +456,7 @@
 
 static void manager_bridging_cleanup(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/group/performance/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_channels.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_channels.c (original)
+++ team/group/performance/main/manager_channels.c Tue Sep 24 13:23:25 2013
@@ -370,7 +370,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_channel_state_string_prefix(
 		const struct ast_channel_snapshot *snapshot,
@@ -1100,7 +1100,7 @@
 
 static void manager_channels_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/group/performance/main/manager_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_mwi.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_mwi.c (original)
+++ team/group/performance/main/manager_mwi.c Tue Sep 24 13:23:25 2013
@@ -41,7 +41,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 /*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
 static int exclude_event_cb(const char *key)
@@ -149,7 +149,7 @@
 
 static void manager_mwi_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/group/performance/main/manager_system.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_system.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_system.c (original)
+++ team/group/performance/main/manager_system.c Tue Sep 24 13:23:25 2013
@@ -34,11 +34,11 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the system topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static void manager_system_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/group/performance/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/stasis.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/stasis.c (original)
+++ team/group/performance/main/stasis.c Tue Sep 24 13:23:25 2013
@@ -29,7 +29,7 @@
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
@@ -37,6 +37,7 @@
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
+#include "asterisk/vector.h"
 
 /*!
  * \page stasis-impl Stasis Implementation Notes
@@ -139,15 +140,17 @@
 struct stasis_topic {
 	char *name;
 	/*! Variable length array of the subscribers */
-	struct stasis_subscription **subscribers;
-	/*! Allocated length of the subscribers array */
-	size_t num_subscribers_max;
-	/*! Current size of the subscribers array */
-	size_t num_subscribers_current;
+	ast_vector(struct stasis_subscription *) subscribers;
+
+	/*! Topics forwarding into this topic */
+	ast_vector(struct stasis_topic *) upstream_topics;
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+	struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 static void topic_dtor(void *obj)
 {
@@ -155,16 +158,18 @@
 
 	/* Subscribers hold a reference to topics, so they should all be
 	 * unsubscribed before we get here. */
-	ast_assert(topic->num_subscribers_current == 0);
+	ast_assert(ast_vector_size(topic->subscribers) == 0);
 	ast_free(topic->name);
 	topic->name = NULL;
-	ast_free(topic->subscribers);
-	topic->subscribers = NULL;
+
+	ast_vector_free(topic->subscribers);
+	ast_vector_free(topic->upstream_topics);
 }
 
 struct stasis_topic *stasis_topic_create(const char *name)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	int res = 0;
 
 	topic = ao2_alloc(sizeof(*topic), topic_dtor);
 
@@ -177,9 +182,10 @@
 		return NULL;
 	}
 
-	topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
-	topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
-	if (!topic->subscribers) {
+	res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+	res |= ast_vector_init(topic->upstream_topics, 0);
+
+	if (res != 0) {
 		return NULL;
 	}
 
@@ -264,7 +270,8 @@
 	}
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 struct stasis_subscription *internal_stasis_subscribe(
 	struct stasis_topic *topic,
@@ -306,7 +313,7 @@
 	if (topic_add_subscription(topic, sub) != 0) {
 		return NULL;
 	}
-	send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+	send_subscription_subscribe(topic, sub);
 
 	ao2_ref(sub, +1);
 	return sub;
@@ -322,27 +329,28 @@
 
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
-	if (sub) {
-		size_t i;
-		/* The subscription may be the last ref to this topic. Hold
-		 * the topic ref open until after the unlock. */
-		RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
-			ao2_cleanup);
-		SCOPED_AO2LOCK(lock_topic, topic);
-
-		for (i = 0; i < topic->num_subscribers_current; ++i) {
-			if (topic->subscribers[i] == sub) {
-				send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
-				/* swap [i] with last entry; remove last entry */
-				topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
-				/* Unsubscribing unrefs the subscription */
-				ao2_cleanup(sub);
-				return NULL;
-			}
-		}
-
-		ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
-	}
+	/* The subscription may be the last ref to this topic. Hold
+	 * the topic ref open until after the unlock. */
+	RAII_VAR(struct stasis_topic *, topic,
+		ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+
+	if (!sub) {
+		return NULL;
+	}
+
+	/* We have to remove the subscription first, to ensure the unsubscribe
+	 * is the final message */
+	if (topic_remove_subscription(sub->topic, sub) != 0) {
+		ast_log(LOG_ERROR,
+			"Internal error: subscription has invalid topic\n");
+		return NULL;
+	}
+
+	/* Now let everyone know about the unsubscribe */
+	send_subscription_unsubscribe(topic, sub);
+
+	/* Unsubscribing unrefs the subscription */
+	ao2_cleanup(sub);
 	return NULL;
 }
 
@@ -392,8 +400,8 @@
 		struct stasis_topic *topic = sub->topic;
 		SCOPED_AO2LOCK(lock_topic, topic);
 
-		for (i = 0; i < topic->num_subscribers_current; ++i) {
-			if (topic->subscribers[i] == sub) {
+		for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+			if (ast_vector_get(topic->subscribers, i) == sub) {
 				return 1;
 			}
 		}
@@ -435,18 +443,8 @@
  */
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-	struct stasis_subscription **subscribers;
+	size_t idx;
 	SCOPED_AO2LOCK(lock, topic);
-
-	/* Increase list size, if needed */
-	if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
-		subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
-		if (!subscribers) {
-			return -1;
-		}
-		topic->subscribers = subscribers;
-		topic->num_subscribers_max *= 2;
-	}
 
 	/* The reference from the topic to the subscription is shared with
 	 * the owner of the subscription, which will explicitly unsubscribe
@@ -454,8 +452,27 @@
 	 *
 	 * If we bumped the refcount here, the owner would have to unsubscribe
 	 * and cleanup, which is a bit awkward. */
-	topic->subscribers[topic->num_subscribers_current++] = sub;
+	ast_vector_append(topic->subscribers, sub);
+
+	for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+		topic_add_subscription(
+			ast_vector_get(topic->upstream_topics, idx), sub);
+	}
+
 	return 0;
+}
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+	size_t idx;
+	SCOPED_AO2LOCK(lock_topic, topic);
+
+	for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+		topic_remove_subscription(
+			ast_vector_get(topic->upstream_topics, idx), sub);
+	}
+
+	return ast_vector_remove_elem_unordered(topic->subscribers, sub);
 }
 
 /*!
@@ -520,6 +537,30 @@
 	return 0;
 }
 
+static void dispatch_message(struct stasis_subscription *sub,
+	struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+	if (sub->mailbox) {
+		struct dispatch *dispatch;
+
+		dispatch = dispatch_create(publisher_topic, message, sub);
+		if (!dispatch) {
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			return;
+		}
+
+		if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+			/* Push failed; just delete the dispatch.
+			 */
+			ast_log(LOG_DEBUG, "Dropping dispatch\n");
+			dispatch_dtor(dispatch);
+		}
+	} else {
+		/* Dispatch directly */
+		subscription_invoke(sub, publisher_topic, message);
+	}
+}
+
 void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
 {
 	size_t i;
@@ -533,66 +574,97 @@
 	ast_assert(publisher_topic != NULL);
 	ast_assert(message != NULL);
 
-	for (i = 0; i < topic->num_subscribers_current; ++i) {
-		struct stasis_subscription *sub = topic->subscribers[i];
+	for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+		struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
 
 		ast_assert(sub != NULL);
 
-		if (sub->mailbox) {
-			struct dispatch *dispatch;
-
-			dispatch = dispatch_create(publisher_topic, message, sub);
-			if (!dispatch) {
-				ast_log(LOG_DEBUG, "Dropping dispatch\n");
-				break;
+		dispatch_message(sub, publisher_topic, message);
+
+	}
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+	stasis_forward_message(topic, topic, message);
+}
+
+struct stasis_forward {
+	struct stasis_topic *from_topic;
+	struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
+{
+	struct stasis_forward *forward = obj;
+
+	ao2_cleanup(forward->from_topic);
+	forward->from_topic = NULL;
+	ao2_cleanup(forward->to_topic);
+	forward->to_topic = NULL;
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+	if (forward) {
+		int idx;
+
+		struct stasis_topic *from = forward->from_topic;
+		struct stasis_topic *to = forward->to_topic;
+
+		SCOPED_AO2LOCK(to_lock, to);
+
+		ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+		ao2_lock(from);
+		for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+			topic_remove_subscription(
+				from, ast_vector_get(to->subscribers, idx));
+		}
+		ao2_unlock(from);
+	}
+
+	ao2_cleanup(forward);
+
+	return NULL;
+}
+
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+	struct stasis_topic *to_topic)
+{
+	RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
+	if (!from_topic || !to_topic) {
+		return NULL;
+	}
+
+	forward = ao2_alloc(sizeof(*forward), forward_dtor);
+	if (!forward) {
+		return NULL;
+	}
+
+	forward->from_topic = ao2_bump(from_topic);
+	forward->to_topic = ao2_bump(to_topic);
+
+	{
+		SCOPED_AO2LOCK(lock, to_topic);
+		int res;
+
+		res = ast_vector_append(to_topic->upstream_topics, from_topic);
+		if (res != 0) {
+			return NULL;
+		}
+
+		{
+			SCOPED_AO2LOCK(lock, from_topic);
+			size_t idx;
+			for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+				topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
 			}
-
-			if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
-				/* Push failed; just delete the dispatch.
-				 */
-				ast_log(LOG_DEBUG, "Dropping dispatch\n");
-				dispatch_dtor(dispatch);
-			}
-		} else {
-			/* Dispatch directly */
-			subscription_invoke(sub, publisher_topic, message);
 		}
 	}
-}
-
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
-{
-	stasis_forward_message(topic, topic, message);
-}
-
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
-{
-	struct stasis_topic *to_topic = data;
-	stasis_forward_message(to_topic, topic, message);
-
-	if (stasis_subscription_final_message(sub, message)) {
-		ao2_cleanup(to_topic);
-	}
-}
-
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
-{
-	struct stasis_subscription *sub;
-	if (!from_topic || !to_topic) {
-		return NULL;
-	}
-
-	/* Forwarding subscriptions should dispatch directly instead of having a
-	 * mailbox. Otherwise, messages forwarded to the same topic from
-	 * different topics may get reordered. Which is bad.
-	 */
-	sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
-	if (sub) {
-		/* hold a ref to to_topic for this forwarding subscription */
-		ao2_ref(to_topic, +1);
-	}
-	return sub;
+
+	return ao2_bump(forward);
 }
 
 static void subscription_change_dtor(void *obj)
@@ -602,7 +674,7 @@
 	ao2_cleanup(change->topic);
 }
 
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
 {
 	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
 
@@ -620,12 +692,15 @@
 	return change;
 }
 
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
 	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-	change = subscription_change_alloc(topic, uniqueid, description);
+	/* This assumes that we have already unsubscribed */
+	ast_assert(stasis_subscription_is_subscribed(sub));
+
+	change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
 
 	if (!change) {
 		return;
@@ -640,15 +715,42 @@
 	stasis_publish(topic, msg);
 }
 
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+	struct stasis_subscription *sub)
+{
+	RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+	/* This assumes that we have already unsubscribed */
+	ast_assert(!stasis_subscription_is_subscribed(sub));
+
+	change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+	if (!change) {
+		return;
+	}
+
+	msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+	if (!msg) {
+		return;
+	}
+
+	stasis_publish(topic, msg);
+
+	/* Now we have to dispatch to the subscription itself */
+	dispatch_message(sub, topic, msg);
+}
+
 struct topic_pool_entry {
-	struct stasis_subscription *forward;
+	struct stasis_forward *forward;
 	struct stasis_topic *topic;
 };
 
 static void topic_pool_entry_dtor(void *obj)
 {
 	struct topic_pool_entry *entry = obj;
-	entry->forward = stasis_unsubscribe(entry->forward);
+	entry->forward = stasis_forward_cancel(entry->forward);
 	ao2_cleanup(entry->topic);
 	entry->topic = NULL;
 }

Modified: team/group/performance/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/stasis_cache_pattern.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/stasis_cache_pattern.c (original)
+++ team/group/performance/main/stasis_cache_pattern.c Tue Sep 24 13:23:25 2013
@@ -39,15 +39,15 @@
 	struct stasis_topic *topic_cached;
 	struct stasis_cache *cache;
 
-	struct stasis_subscription *forward_all_to_cached;
+	struct stasis_forward *forward_all_to_cached;
 };
 
 struct stasis_cp_single {
 	struct stasis_topic *topic;
 	struct stasis_caching_topic *topic_cached;
 
-	struct stasis_subscription *forward_topic_to_all;
-	struct stasis_subscription *forward_cached_to_all;
+	struct stasis_forward *forward_topic_to_all;
+	struct stasis_forward *forward_cached_to_all;
 };
 
 static void all_dtor(void *obj)
@@ -60,7 +60,7 @@
 	all->topic_cached = NULL;
 	ao2_cleanup(all->cache);
 	all->cache = NULL;
-	stasis_unsubscribe_and_join(all->forward_all_to_cached);
+	stasis_forward_cancel(all->forward_all_to_cached);
 	all->forward_all_to_cached = NULL;
 }
 
@@ -172,9 +172,9 @@
 		return;
 	}
 
-	stasis_unsubscribe(one->forward_topic_to_all);
+	stasis_forward_cancel(one->forward_topic_to_all);
 	one->forward_topic_to_all = NULL;
-	stasis_unsubscribe(one->forward_cached_to_all);
+	stasis_forward_cancel(one->forward_cached_to_all);
 	one->forward_cached_to_all = NULL;
 	stasis_caching_unsubscribe(one->topic_cached);
 	one->topic_cached = NULL;

Modified: team/group/performance/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/res/stasis/app.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/res/stasis/app.c (original)
+++ team/group/performance/res/stasis/app.c Tue Sep 24 13:23:25 2013
@@ -58,9 +58,9 @@
 	int interested;
 
 	/*! Forward for the regular topic */
-	struct stasis_subscription *topic_forward;
+	struct stasis_forward *topic_forward;
 	/*! Forward for the caching topic */
-	struct stasis_subscription *topic_cached_forward;
+	struct stasis_forward *topic_cached_forward;
 
 	/*! Unique id of the object being forwarded */
 	char id[];
@@ -78,9 +78,9 @@
 
 static void forwards_unsubscribe(struct app_forwards *forwards)
 {
-	stasis_unsubscribe(forwards->topic_forward);
+	stasis_forward_cancel(forwards->topic_forward);
 	forwards->topic_forward = NULL;
-	stasis_unsubscribe(forwards->topic_cached_forward);
+	stasis_forward_cancel(forwards->topic_cached_forward);
 	forwards->topic_cached_forward = NULL;
 }
 
@@ -129,7 +129,7 @@
 		ast_channel_topic_cached(chan), app->topic);
 	if (!forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_unsubscribe(forwards->topic_forward);
+		stasis_forward_cancel(forwards->topic_forward);
 		forwards->topic_forward = NULL;
 		return NULL;
 	}
@@ -163,7 +163,7 @@
 		ast_bridge_topic_cached(bridge), app->topic);
 	if (!forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_unsubscribe(forwards->topic_forward);
+		stasis_forward_cancel(forwards->topic_forward);
 		forwards->topic_forward = NULL;
 		return NULL;
 	}

Modified: team/group/performance/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/tests/test_stasis.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/tests/test_stasis.c (original)
+++ team/group/performance/tests/test_stasis.c Tue Sep 24 13:23:25 2013
@@ -427,7 +427,7 @@
 	RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 
-	RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
 	RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 
@@ -499,8 +499,8 @@
 	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
 
-	RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
-	RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+	RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+	RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
 	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
 
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);

Modified: team/group/performance/tests/test_stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/tests/test_stasis_endpoints.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/tests/test_stasis_endpoints.c (original)
+++ team/group/performance/tests/test_stasis_endpoints.c Tue Sep 24 13:23:25 2013
@@ -264,11 +264,14 @@
 	type = stasis_message_type(msg);
 	ast_test_validate(test, ast_channel_snapshot_type() == type);
 
+	/* The ordering of the cache clear and endpoint snapshot are
+	 * unspecified */
 	msg = sink->messages[3];
-	type = stasis_message_type(msg);
-	ast_test_validate(test, stasis_cache_clear_type() == type);
-
-	msg = sink->messages[4];
+	if (stasis_message_type(msg) == stasis_cache_clear_type()) {
+		/* Okay; the next message should be the endpoint snapshot */
+		msg = sink->messages[4];
+	}
+
 	type = stasis_message_type(msg);
 	ast_test_validate(test, ast_endpoint_snapshot_type() == type);
 	actual_snapshot = stasis_message_data(msg);




More information about the asterisk-commits mailing list