[asterisk-commits] dlee: branch dlee/stasis-forward-optimization r399618 - in /team/dlee/stasis-...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sat Sep 21 10:13:41 CDT 2013


Author: dlee
Date: Sat Sep 21 10:13:36 2013
New Revision: 399618

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399618
Log:
At least it compiles

Added:
    team/dlee/stasis-forward-optimization/include/asterisk/vector.h   (with props)
Modified:
    team/dlee/stasis-forward-optimization/apps/app_queue.c
    team/dlee/stasis-forward-optimization/include/asterisk/stasis.h
    team/dlee/stasis-forward-optimization/main/cdr.c
    team/dlee/stasis-forward-optimization/main/cel.c
    team/dlee/stasis-forward-optimization/main/channel_internal_api.c
    team/dlee/stasis-forward-optimization/main/manager.c
    team/dlee/stasis-forward-optimization/main/manager_bridges.c
    team/dlee/stasis-forward-optimization/main/manager_channels.c
    team/dlee/stasis-forward-optimization/main/manager_mwi.c
    team/dlee/stasis-forward-optimization/main/manager_system.c
    team/dlee/stasis-forward-optimization/main/stasis.c
    team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c
    team/dlee/stasis-forward-optimization/res/stasis/app.c

Modified: team/dlee/stasis-forward-optimization/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/apps/app_queue.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/apps/app_queue.c (original)
+++ team/dlee/stasis-forward-optimization/apps/app_queue.c Sat Sep 21 10:13:36 2013
@@ -10309,7 +10309,7 @@
 };
 
 static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static int unload_module(void)
 {
@@ -10337,7 +10337,7 @@
 		stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
 	}
 	stasis_message_router_unsubscribe_and_join(agent_router);
-	topic_forwarder = stasis_unsubscribe(topic_forwarder);
+	topic_forwarder = stasis_forward_cancel(topic_forwarder);
 
 	STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);

Modified: team/dlee/stasis-forward-optimization/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/stasis.h?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-forward-optimization/include/asterisk/stasis.h Sat Sep 21 10:13:36 2013
@@ -464,6 +464,8 @@
 struct stasis_subscription *stasis_unsubscribe_and_join(
 	struct stasis_subscription *subscription);
 
+struct stasis_forward;
+
 /*!
  * \brief Create a subscription which forwards all messages from one topic to
  * another.
@@ -477,8 +479,10 @@
  * \return \c NULL on error.
  * \since 12
  */
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
 	struct stasis_topic *to_topic);
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
 
 /*!
  * \brief Get the unique ID for the subscription.

Added: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/vector.h?view=auto&rev=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/vector.h (added)
+++ team/dlee/stasis-forward-optimization/include/asterisk/vector.h Sat Sep 21 10:13:36 2013
@@ -1,0 +1,172 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_VECTOR_H
+#define _ASTERISK_VECTOR_H
+
+/*! \file
+ *
+ * \brief Vector container support.
+ *
+ * A vector is a variable length array, with properties that can be useful when
+ * order doesn't matter.
+ *  - Appends are asymptotically constant time.
+ *  - Unordered removes are constant time.
+ *  - Search is linear time
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ * \since 12
+ */
+
+/*! \brief Define a vector structure */
+#define ast_vector(type)			\
+	struct {				\
+		type *elems;			\
+		size_t max;			\
+		size_t current;			\
+	}
+
+/*!
+ * \brief Initialize a vector
+ *
+ * If \a size is 0, then no space will be allocated until the vector is
+ * appended to.
+ *
+ * \param vec Vector to initialize.
+ * \param size Initial size of the vector.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_init(vec, size) ({					\
+	size_t __size = (size);						\
+	size_t alloc_size = __size * sizeof(*(vec).elems); 		\
+	(vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL;	\
+	(vec).current = 0;						\
+	if ((vec).elems) {						\
+		(vec).max = __size;					\
+	} else {							\
+		(vec).max = 0;						\
+	}								\
+	alloc_size == 0 || (vec).elems != NULL ? 0 : -1;		\
+})
+
+/*!
+ * \brief Deallocates this vector.
+ *
+ * If any code to free the elements of this vector need to be run, that should
+ * be done prior to this call.
+ *
+ * \param vec Vector to deallocate.
+ */
+#define ast_vector_free(vec) do {		\
+	ast_free((vec).elems);			\
+	(vec).elems = NULL;			\
+	(vec).max = 0;				\
+	(vec).current = 0;			\
+} while (0)
+
+/*!
+ * \brief Append an element to a vector, growing the vector if needed.
+ *
+ * \param vec Vector to append to.
+ * \param elem Element to append.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_append(vec, elem) ({					\
+	int res = 0;							\
+									\
+	if ((vec).current + 1 > (vec).max) {				\
+		size_t new_max = (vec).max ? 2 * (vec).max : 1;		\
+		typeof((vec).elems) new_elems = ast_realloc(		\
+			(vec).elems, new_max * sizeof(*new_elems));	\
+		if (new_elems) {					\
+			(vec).elems = new_elems;				\
+			(vec).max = new_max;				\
+		} else {						\
+			res = -1;					\
+		}							\
+	}								\
+									\
+	if (res == 0) {							\
+		(vec).elems[(vec).current++] = (elem);			\
+	}								\
+	res;								\
+})
+
+/*!
+ * \brief Remove an element from a vector by index.
+ *
+ * Note that elements in the vector may be reordered, so that the remove can
+ * happen in constant time.
+ *
+ * \param vec Vector to remove from.
+ * \param idx Index of the element to remove.
+ */
+#define ast_vector_remove_unordered(vec, idx) do {		\
+	size_t __idx = (idx);					\
+	ast_assert(__idx < (vec).current);			\
+	(vec).elems[__idx] = (vec).elems[--(vec).current];	\
+} while (0)
+
+/*!
+ * \brief Remove an element from a vector.
+ *
+ * \param vec Vector to remove from.
+ * \param elem Element to remove
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_elem_unordered(vec, elem) ({			\
+	int res = -1;							\
+	size_t idx;							\
+	typeof(elem) __elem = (elem);					\
+	for (idx = 0; idx < (vec).current; ++idx) {			\
+		if (__elem == (vec).elems[idx]) {			\
+			(vec).elems[idx] = (vec).elems[--(vec).current]; \
+			res = 0;					\
+			break;						\
+		}							\
+	}								\
+	res;								\
+})
+
+
+/*!
+ * \brief Get the number of elements in a vector.
+ *
+ * \param vec Vector to query.
+ * \return Number of elements in the vector.
+ */
+#define ast_vector_size(vec) (vec).current
+
+/*!
+ * \brief Get an element from a vector.
+ *
+ * \param vec Vector to query.
+ * \param idx Index of the element to get.
+ */
+#define ast_vector_get(vec, idx) ({		\
+	size_t __idx = (idx);			\
+	ast_assert(__idx < (vec).current);	\
+	(vec).elems[__idx];			\
+})
+
+#endif /* _ASTERISK_VECTOR_H */

Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/dlee/stasis-forward-optimization/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/cdr.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/cdr.c (original)
+++ team/dlee/stasis-forward-optimization/main/cdr.c Sat Sep 21 10:13:36 2013
@@ -334,13 +334,13 @@
 static struct stasis_message_router *stasis_router;
 
 /*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
 
 /*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
 
 /*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
 
 /*! \brief The parent topic for all topics we want to aggregate for CDRs */
 static struct stasis_topic *cdr_topic;
@@ -3965,9 +3965,9 @@
 
 static void cdr_engine_cleanup(void)
 {
-	channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
-	bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
-	parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+	channel_subscription = stasis_forward_cancel(channel_subscription);
+	bridge_subscription = stasis_forward_cancel(bridge_subscription);
+	parking_subscription = stasis_forward_cancel(parking_subscription);
 	stasis_message_router_unsubscribe_and_join(stasis_router);
 	ao2_cleanup(cdr_topic);
 	cdr_topic = NULL;

Modified: team/dlee/stasis-forward-optimization/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/cel.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/cel.c (original)
+++ team/dlee/stasis-forward-optimization/main/cel.c Sat Sep 21 10:13:36 2013
@@ -121,16 +121,16 @@
 static struct stasis_topic *cel_aggregation_topic;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
 
 /*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
 
 /*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
 
 /*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
 
 struct stasis_message_type *cel_generic_type(void);
 STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1343,10 +1343,10 @@
 	cel_aggregation_topic = NULL;
 	ao2_cleanup(cel_topic);
 	cel_topic = NULL;
-	cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
-	cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
-	cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
-	cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+	cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+	cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+	cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+	cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
 	ast_cli_unregister(&cli_status);
 	ao2_cleanup(cel_dialstatus_store);
 	cel_dialstatus_store = NULL;

Modified: team/dlee/stasis-forward-optimization/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/channel_internal_api.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/channel_internal_api.c (original)
+++ team/dlee/stasis-forward-optimization/main/channel_internal_api.c Sat Sep 21 10:13:36 2013
@@ -207,8 +207,7 @@
 	char sending_dtmf_digit;			/*!< Digit this channel is currently sending out. (zero if not sending) */
 	struct timeval sending_dtmf_tv;		/*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
 	struct stasis_cp_single *topics;		/*!< Topic for all channel's events */
-	struct stasis_subscription *forwarder;		/*!< Subscription for event forwarding to all topic */
-	struct stasis_subscription *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
+	struct stasis_forward *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
 };
 
 /*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@
 
 	ast_string_field_free_memory(chan);
 
-	chan->forwarder = stasis_unsubscribe(chan->forwarder);
-	chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+	chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
 
 	stasis_cp_single_unsubscribe(chan->topics);
 	chan->topics = NULL;

Modified: team/dlee/stasis-forward-optimization/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager.c Sat Sep 21 10:13:36 2013
@@ -1126,7 +1126,7 @@
 static struct stasis_message_router *stasis_router;
 
 /*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
 
 #define MGR_SHOW_TERMINAL_WIDTH 80
 
@@ -7755,7 +7755,7 @@
 		stasis_message_router_unsubscribe_and_join(stasis_router);
 		stasis_router = NULL;
 	}
-	stasis_unsubscribe_and_join(rtp_topic_forwarder);
+	stasis_forward_cancel(rtp_topic_forwarder);
 	rtp_topic_forwarder = NULL;
 	ao2_cleanup(manager_topic);
 	manager_topic = NULL;

Modified: team/dlee/stasis-forward-optimization/main/manager_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_bridges.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_bridges.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_bridges.c Sat Sep 21 10:13:36 2013
@@ -106,7 +106,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_bridge_state_string_prefix(
 	const struct ast_bridge_snapshot *snapshot,
@@ -456,7 +456,7 @@
 
 static void manager_bridging_cleanup(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/dlee/stasis-forward-optimization/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_channels.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_channels.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_channels.c Sat Sep 21 10:13:36 2013
@@ -370,7 +370,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 struct ast_str *ast_manager_build_channel_state_string_prefix(
 		const struct ast_channel_snapshot *snapshot,
@@ -1100,7 +1100,7 @@
 
 static void manager_channels_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/dlee/stasis-forward-optimization/main/manager_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_mwi.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_mwi.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_mwi.c Sat Sep 21 10:13:36 2013
@@ -41,7 +41,7 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 /*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
 static int exclude_event_cb(const char *key)
@@ -149,7 +149,7 @@
 
 static void manager_mwi_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/dlee/stasis-forward-optimization/main/manager_system.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_system.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_system.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_system.c Sat Sep 21 10:13:36 2013
@@ -34,11 +34,11 @@
 /*! \brief The \ref stasis subscription returned by the forwarding of the system topic
  * to the manager topic
  */
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
 
 static void manager_system_shutdown(void)
 {
-	stasis_unsubscribe(topic_forwarder);
+	stasis_forward_cancel(topic_forwarder);
 	topic_forwarder = NULL;
 }
 

Modified: team/dlee/stasis-forward-optimization/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis.c Sat Sep 21 10:13:36 2013
@@ -29,7 +29,7 @@
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
@@ -37,6 +37,7 @@
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
+#include "asterisk/vector.h"
 
 /*!
  * \page stasis-impl Stasis Implementation Notes
@@ -139,15 +140,17 @@
 struct stasis_topic {
 	char *name;
 	/*! Variable length array of the subscribers */
-	struct stasis_subscription **subscribers;
-	/*! Allocated length of the subscribers array */
-	size_t num_subscribers_max;
-	/*! Current size of the subscribers array */
-	size_t num_subscribers_current;
+	ast_vector(struct stasis_subscription *) subscribers;
+
+	/*! Topics forwarding into this topic */
+	ast_vector(struct stasis_topic *) upstream_topics;
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+	struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
 static void topic_dtor(void *obj)
 {
@@ -155,16 +158,18 @@
 
 	/* Subscribers hold a reference to topics, so they should all be
 	 * unsubscribed before we get here. */
-	ast_assert(topic->num_subscribers_current == 0);
+	ast_assert(ast_vector_size(topic->subscribers) == 0);
 	ast_free(topic->name);
 	topic->name = NULL;
-	ast_free(topic->subscribers);
-	topic->subscribers = NULL;
+
+	ast_vector_free(topic->subscribers);
+	ast_vector_free(topic->upstream_topics);
 }
 
 struct stasis_topic *stasis_topic_create(const char *name)
 {
 	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+	int res = 0;
 
 	topic = ao2_alloc(sizeof(*topic), topic_dtor);
 
@@ -177,9 +182,10 @@
 		return NULL;
 	}
 
-	topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
-	topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
-	if (!topic->subscribers) {
+	res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+	res |= ast_vector_init(topic->upstream_topics, 0);
+
+	if (res != 0) {
 		return NULL;
 	}
 
@@ -322,27 +328,30 @@
 
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
-	if (sub) {
-		size_t i;
-		/* The subscription may be the last ref to this topic. Hold
-		 * the topic ref open until after the unlock. */
-		RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
-			ao2_cleanup);
-		SCOPED_AO2LOCK(lock_topic, topic);
-
-		for (i = 0; i < topic->num_subscribers_current; ++i) {
-			if (topic->subscribers[i] == sub) {
-				send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
-				/* swap [i] with last entry; remove last entry */
-				topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
-				/* Unsubscribing unrefs the subscription */
-				ao2_cleanup(sub);
-				return NULL;
-			}
-		}
-
-		ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
-	}
+	/* The subscription may be the last ref to this topic. Hold
+	 * the topic ref open until after the unlock. */
+	RAII_VAR(struct stasis_topic *, topic,
+		ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+
+	if (!sub) {
+		return NULL;
+	}
+
+	/* While we don't know if the removal will be successful, we have to
+	 * send the Unsubscribe first, so sub will get its final message.
+	 * Since failure is a serious coding problem that shouldn't happen,
+	 * shouldn't be a big deal.
+	 */
+	send_subscription_change_message(topic,	sub->uniqueid, "Unsubscribe");
+
+	if (topic_remove_subscription(sub->topic, sub) != 0) {
+		ast_log(LOG_ERROR,
+			"Internal error: subscription has invalid topic\n");
+		return NULL;
+	}
+
+	/* Unsubscribing unrefs the subscription */
+	ao2_cleanup(sub);
 	return NULL;
 }
 
@@ -392,8 +401,8 @@
 		struct stasis_topic *topic = sub->topic;
 		SCOPED_AO2LOCK(lock_topic, topic);
 
-		for (i = 0; i < topic->num_subscribers_current; ++i) {
-			if (topic->subscribers[i] == sub) {
+		for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+			if (ast_vector_get(topic->subscribers, i) == sub) {
 				return 1;
 			}
 		}
@@ -435,18 +444,8 @@
  */
 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-	struct stasis_subscription **subscribers;
+	size_t idx;
 	SCOPED_AO2LOCK(lock, topic);
-
-	/* Increase list size, if needed */
-	if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
-		subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
-		if (!subscribers) {
-			return -1;
-		}
-		topic->subscribers = subscribers;
-		topic->num_subscribers_max *= 2;
-	}
 
 	/* The reference from the topic to the subscription is shared with
 	 * the owner of the subscription, which will explicitly unsubscribe
@@ -454,8 +453,27 @@
 	 *
 	 * If we bumped the refcount here, the owner would have to unsubscribe
 	 * and cleanup, which is a bit awkward. */
-	topic->subscribers[topic->num_subscribers_current++] = sub;
+	ast_vector_append(topic->subscribers, sub);
+
+	for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+		topic_add_subscription(
+			ast_vector_get(topic->upstream_topics, idx), sub);
+	}
+
 	return 0;
+}
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+	size_t idx;
+	SCOPED_AO2LOCK(lock_topic, topic);
+
+	for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+		topic_remove_subscription(
+			ast_vector_get(topic->upstream_topics, idx), sub);
+	}
+
+	return ast_vector_remove_elem_unordered(topic->subscribers, sub);
 }
 
 /*!
@@ -533,8 +551,8 @@
 	ast_assert(publisher_topic != NULL);
 	ast_assert(message != NULL);
 
-	for (i = 0; i < topic->num_subscribers_current; ++i) {
-		struct stasis_subscription *sub = topic->subscribers[i];
+	for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+		struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
 
 		ast_assert(sub != NULL);
 
@@ -565,34 +583,77 @@
 	stasis_forward_message(topic, topic, message);
 }
 
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
-{
-	struct stasis_topic *to_topic = data;
-	stasis_forward_message(to_topic, topic, message);
-
-	if (stasis_subscription_final_message(sub, message)) {
-		ao2_cleanup(to_topic);
-	}
-}
-
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
-{
-	struct stasis_subscription *sub;
+struct stasis_forward {
+	struct stasis_topic *from_topic;
+	struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
+{
+	struct stasis_forward *forward = obj;
+
+	ao2_cleanup(forward->from_topic);
+	ao2_cleanup(forward->to_topic);
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+	if (forward) {
+		int idx;
+
+		struct stasis_topic *from = forward->from_topic;
+		struct stasis_topic *to = forward->to_topic;
+
+		SCOPED_AO2LOCK(to_lock, to);
+
+		ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+		ao2_lock(from);
+		for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+			topic_remove_subscription(
+				from, ast_vector_get(to->subscribers, idx));
+		}
+		ao2_unlock(from);
+	}
+
+	return NULL;
+}
+
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+	struct stasis_topic *to_topic)
+{
+	struct stasis_forward *forward;
 	if (!from_topic || !to_topic) {
 		return NULL;
 	}
 
-	/* Forwarding subscriptions should dispatch directly instead of having a
-	 * mailbox. Otherwise, messages forwarded to the same topic from
-	 * different topics may get reordered. Which is bad.
-	 */
-	sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
-	if (sub) {
-		/* hold a ref to to_topic for this forwarding subscription */
-		ao2_ref(to_topic, +1);
-	}
-	return sub;
+	forward = ao2_alloc(sizeof(*forward), forward_dtor);
+	if (!forward) {
+		return NULL;
+	}
+
+	forward->from_topic = ao2_bump(from_topic);
+	forward->to_topic = ao2_bump(to_topic);
+
+	{
+		SCOPED_AO2LOCK(lock, to_topic);
+		int res;
+
+		res = ast_vector_append(to_topic->upstream_topics, from_topic);
+		if (res != 0) {
+			return NULL;
+		}
+
+		{
+			SCOPED_AO2LOCK(lock, from_topic);
+			size_t idx;
+			for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+				topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+			}
+		}
+	}
+
+	return ao2_bump(forward);
 }
 
 static void subscription_change_dtor(void *obj)
@@ -641,14 +702,14 @@
 }
 
 struct topic_pool_entry {
-	struct stasis_subscription *forward;
+	struct stasis_forward *forward;
 	struct stasis_topic *topic;
 };
 
 static void topic_pool_entry_dtor(void *obj)
 {
 	struct topic_pool_entry *entry = obj;
-	entry->forward = stasis_unsubscribe(entry->forward);
+	entry->forward = stasis_forward_cancel(entry->forward);
 	ao2_cleanup(entry->topic);
 	entry->topic = NULL;
 }

Modified: team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c Sat Sep 21 10:13:36 2013
@@ -39,15 +39,15 @@
 	struct stasis_topic *topic_cached;
 	struct stasis_cache *cache;
 
-	struct stasis_subscription *forward_all_to_cached;
+	struct stasis_forward *forward_all_to_cached;
 };
 
 struct stasis_cp_single {
 	struct stasis_topic *topic;
 	struct stasis_caching_topic *topic_cached;
 
-	struct stasis_subscription *forward_topic_to_all;
-	struct stasis_subscription *forward_cached_to_all;
+	struct stasis_forward *forward_topic_to_all;
+	struct stasis_forward *forward_cached_to_all;
 };
 
 static void all_dtor(void *obj)
@@ -60,7 +60,7 @@
 	all->topic_cached = NULL;
 	ao2_cleanup(all->cache);
 	all->cache = NULL;
-	stasis_unsubscribe_and_join(all->forward_all_to_cached);
+	stasis_forward_cancel(all->forward_all_to_cached);
 	all->forward_all_to_cached = NULL;
 }
 
@@ -172,9 +172,9 @@
 		return;
 	}
 
-	stasis_unsubscribe(one->forward_topic_to_all);
+	stasis_forward_cancel(one->forward_topic_to_all);
 	one->forward_topic_to_all = NULL;
-	stasis_unsubscribe(one->forward_cached_to_all);
+	stasis_forward_cancel(one->forward_cached_to_all);
 	one->forward_cached_to_all = NULL;
 	stasis_caching_unsubscribe(one->topic_cached);
 	one->topic_cached = NULL;

Modified: team/dlee/stasis-forward-optimization/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/res/stasis/app.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/res/stasis/app.c (original)
+++ team/dlee/stasis-forward-optimization/res/stasis/app.c Sat Sep 21 10:13:36 2013
@@ -58,9 +58,9 @@
 	int interested;
 
 	/*! Forward for the regular topic */
-	struct stasis_subscription *topic_forward;
+	struct stasis_forward *topic_forward;
 	/*! Forward for the caching topic */
-	struct stasis_subscription *topic_cached_forward;
+	struct stasis_forward *topic_cached_forward;
 
 	/*! Unique id of the object being forwarded */
 	char id[];
@@ -78,9 +78,9 @@
 
 static void forwards_unsubscribe(struct app_forwards *forwards)
 {
-	stasis_unsubscribe(forwards->topic_forward);
+	stasis_forward_cancel(forwards->topic_forward);
 	forwards->topic_forward = NULL;
-	stasis_unsubscribe(forwards->topic_cached_forward);
+	stasis_forward_cancel(forwards->topic_cached_forward);
 	forwards->topic_cached_forward = NULL;
 }
 
@@ -129,7 +129,7 @@
 		ast_channel_topic_cached(chan), app->topic);
 	if (!forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_unsubscribe(forwards->topic_forward);
+		stasis_forward_cancel(forwards->topic_forward);
 		forwards->topic_forward = NULL;
 		return NULL;
 	}
@@ -163,7 +163,7 @@
 		ast_bridge_topic_cached(bridge), app->topic);
 	if (!forwards->topic_cached_forward) {
 		/* Half-subscribed is a bad thing */
-		stasis_unsubscribe(forwards->topic_forward);
+		stasis_forward_cancel(forwards->topic_forward);
 		forwards->topic_forward = NULL;
 		return NULL;
 	}




More information about the asterisk-commits mailing list