[asterisk-commits] dlee: branch dlee/stasis-forward-optimization r399618 - in /team/dlee/stasis-...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Sat Sep 21 10:13:41 CDT 2013
Author: dlee
Date: Sat Sep 21 10:13:36 2013
New Revision: 399618
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399618
Log:
At least it compiles
Added:
team/dlee/stasis-forward-optimization/include/asterisk/vector.h (with props)
Modified:
team/dlee/stasis-forward-optimization/apps/app_queue.c
team/dlee/stasis-forward-optimization/include/asterisk/stasis.h
team/dlee/stasis-forward-optimization/main/cdr.c
team/dlee/stasis-forward-optimization/main/cel.c
team/dlee/stasis-forward-optimization/main/channel_internal_api.c
team/dlee/stasis-forward-optimization/main/manager.c
team/dlee/stasis-forward-optimization/main/manager_bridges.c
team/dlee/stasis-forward-optimization/main/manager_channels.c
team/dlee/stasis-forward-optimization/main/manager_mwi.c
team/dlee/stasis-forward-optimization/main/manager_system.c
team/dlee/stasis-forward-optimization/main/stasis.c
team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c
team/dlee/stasis-forward-optimization/res/stasis/app.c
Modified: team/dlee/stasis-forward-optimization/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/apps/app_queue.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/apps/app_queue.c (original)
+++ team/dlee/stasis-forward-optimization/apps/app_queue.c Sat Sep 21 10:13:36 2013
@@ -10309,7 +10309,7 @@
};
static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static int unload_module(void)
{
@@ -10337,7 +10337,7 @@
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
stasis_message_router_unsubscribe_and_join(agent_router);
- topic_forwarder = stasis_unsubscribe(topic_forwarder);
+ topic_forwarder = stasis_forward_cancel(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
Modified: team/dlee/stasis-forward-optimization/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/stasis.h?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-forward-optimization/include/asterisk/stasis.h Sat Sep 21 10:13:36 2013
@@ -464,6 +464,8 @@
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
+struct stasis_forward;
+
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
@@ -477,8 +479,10 @@
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
/*!
* \brief Get the unique ID for the subscription.
Added: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/include/asterisk/vector.h?view=auto&rev=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/include/asterisk/vector.h (added)
+++ team/dlee/stasis-forward-optimization/include/asterisk/vector.h Sat Sep 21 10:13:36 2013
@@ -1,0 +1,172 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_VECTOR_H
+#define _ASTERISK_VECTOR_H
+
+/*! \file
+ *
+ * \brief Vector container support.
+ *
+ * A vector is a variable length array, with properties that can be useful when
+ * order doesn't matter.
+ * - Appends are asymptotically constant time.
+ * - Unordered removes are constant time.
+ * - Search is linear time
+ *
+ * \author David M. Lee, II <dlee at digium.com>
+ * \since 12
+ */
+
+/*! \brief Define a vector structure */
+#define ast_vector(type) \
+ struct { \
+ type *elems; \
+ size_t max; \
+ size_t current; \
+ }
+
+/*!
+ * \brief Initialize a vector
+ *
+ * If \a size is 0, then no space will be allocated until the vector is
+ * appended to.
+ *
+ * \param vec Vector to initialize.
+ * \param size Initial size of the vector.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_init(vec, size) ({ \
+ size_t __size = (size); \
+ size_t alloc_size = __size * sizeof(*(vec).elems); \
+ (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \
+ (vec).current = 0; \
+ if ((vec).elems) { \
+ (vec).max = __size; \
+ } else { \
+ (vec).max = 0; \
+ } \
+ alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \
+})
+
+/*!
+ * \brief Deallocates this vector.
+ *
+ * If any code to free the elements of this vector need to be run, that should
+ * be done prior to this call.
+ *
+ * \param vec Vector to deallocate.
+ */
+#define ast_vector_free(vec) do { \
+ ast_free((vec).elems); \
+ (vec).elems = NULL; \
+ (vec).max = 0; \
+ (vec).current = 0; \
+} while (0)
+
+/*!
+ * \brief Append an element to a vector, growing the vector if needed.
+ *
+ * \param vec Vector to append to.
+ * \param elem Element to append.
+ *
+ * \return 0 on success.
+ * \return Non-zero on failure.
+ */
+#define ast_vector_append(vec, elem) ({ \
+ int res = 0; \
+ \
+ if ((vec).current + 1 > (vec).max) { \
+ size_t new_max = (vec).max ? 2 * (vec).max : 1; \
+ typeof((vec).elems) new_elems = ast_realloc( \
+ (vec).elems, new_max * sizeof(*new_elems)); \
+ if (new_elems) { \
+ (vec).elems = new_elems; \
+ (vec).max = new_max; \
+ } else { \
+ res = -1; \
+ } \
+ } \
+ \
+ if (res == 0) { \
+ (vec).elems[(vec).current++] = (elem); \
+ } \
+ res; \
+})
+
+/*!
+ * \brief Remove an element from a vector by index.
+ *
+ * Note that elements in the vector may be reordered, so that the remove can
+ * happen in constant time.
+ *
+ * \param vec Vector to remove from.
+ * \param idx Index of the element to remove.
+ */
+#define ast_vector_remove_unordered(vec, idx) do { \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ (vec).elems[__idx] = (vec).elems[--(vec).current]; \
+} while (0)
+
+/*!
+ * \brief Remove an element from a vector.
+ *
+ * \param vec Vector to remove from.
+ * \param elem Element to remove
+ * \return 0 if element was removed.
+ * \return Non-zero if element was not in the vector.
+ */
+#define ast_vector_remove_elem_unordered(vec, elem) ({ \
+ int res = -1; \
+ size_t idx; \
+ typeof(elem) __elem = (elem); \
+ for (idx = 0; idx < (vec).current; ++idx) { \
+ if (__elem == (vec).elems[idx]) { \
+ (vec).elems[idx] = (vec).elems[--(vec).current]; \
+ res = 0; \
+ break; \
+ } \
+ } \
+ res; \
+})
+
+
+/*!
+ * \brief Get the number of elements in a vector.
+ *
+ * \param vec Vector to query.
+ * \return Number of elements in the vector.
+ */
+#define ast_vector_size(vec) (vec).current
+
+/*!
+ * \brief Get an element from a vector.
+ *
+ * \param vec Vector to query.
+ * \param idx Index of the element to get.
+ */
+#define ast_vector_get(vec, idx) ({ \
+ size_t __idx = (idx); \
+ ast_assert(__idx < (vec).current); \
+ (vec).elems[__idx]; \
+})
+
+#endif /* _ASTERISK_VECTOR_H */
Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/dlee/stasis-forward-optimization/include/asterisk/vector.h
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: team/dlee/stasis-forward-optimization/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/cdr.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/cdr.c (original)
+++ team/dlee/stasis-forward-optimization/main/cdr.c Sat Sep 21 10:13:36 2013
@@ -334,13 +334,13 @@
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
@@ -3965,9 +3965,9 @@
static void cdr_engine_cleanup(void)
{
- channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
- bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
- parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+ channel_subscription = stasis_forward_cancel(channel_subscription);
+ bridge_subscription = stasis_forward_cancel(bridge_subscription);
+ parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;
Modified: team/dlee/stasis-forward-optimization/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/cel.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/cel.c (original)
+++ team/dlee/stasis-forward-optimization/main/cel.c Sat Sep 21 10:13:36 2013
@@ -121,16 +121,16 @@
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1343,10 +1343,10 @@
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
- cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
- cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
- cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
- cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+ cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+ cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+ cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+ cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;
Modified: team/dlee/stasis-forward-optimization/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/channel_internal_api.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/channel_internal_api.c (original)
+++ team/dlee/stasis-forward-optimization/main/channel_internal_api.c Sat Sep 21 10:13:36 2013
@@ -207,8 +207,7 @@
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
- struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
- struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
+ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@
ast_string_field_free_memory(chan);
- chan->forwarder = stasis_unsubscribe(chan->forwarder);
- chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+ chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;
Modified: team/dlee/stasis-forward-optimization/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager.c Sat Sep 21 10:13:36 2013
@@ -1126,7 +1126,7 @@
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
@@ -7755,7 +7755,7 @@
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
- stasis_unsubscribe_and_join(rtp_topic_forwarder);
+ stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
Modified: team/dlee/stasis-forward-optimization/main/manager_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_bridges.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_bridges.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_bridges.c Sat Sep 21 10:13:36 2013
@@ -106,7 +106,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
@@ -456,7 +456,7 @@
static void manager_bridging_cleanup(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/dlee/stasis-forward-optimization/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_channels.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_channels.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_channels.c Sat Sep 21 10:13:36 2013
@@ -370,7 +370,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
@@ -1100,7 +1100,7 @@
static void manager_channels_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/dlee/stasis-forward-optimization/main/manager_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_mwi.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_mwi.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_mwi.c Sat Sep 21 10:13:36 2013
@@ -41,7 +41,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
@@ -149,7 +149,7 @@
static void manager_mwi_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/dlee/stasis-forward-optimization/main/manager_system.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/manager_system.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/manager_system.c (original)
+++ team/dlee/stasis-forward-optimization/main/manager_system.c Sat Sep 21 10:13:36 2013
@@ -34,11 +34,11 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/dlee/stasis-forward-optimization/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis.c Sat Sep 21 10:13:36 2013
@@ -29,7 +29,7 @@
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
@@ -37,6 +37,7 @@
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
@@ -139,15 +140,17 @@
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
- struct stasis_subscription **subscribers;
- /*! Allocated length of the subscribers array */
- size_t num_subscribers_max;
- /*! Current size of the subscribers array */
- size_t num_subscribers_current;
+ ast_vector(struct stasis_subscription *) subscribers;
+
+ /*! Topics forwarding into this topic */
+ ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+ struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
@@ -155,16 +158,18 @@
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
- ast_assert(topic->num_subscribers_current == 0);
+ ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
- ast_free(topic->subscribers);
- topic->subscribers = NULL;
+
+ ast_vector_free(topic->subscribers);
+ ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
@@ -177,9 +182,10 @@
return NULL;
}
- topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
- topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
- if (!topic->subscribers) {
+ res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+ res |= ast_vector_init(topic->upstream_topics, 0);
+
+ if (res != 0) {
return NULL;
}
@@ -322,27 +328,30 @@
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
- if (sub) {
- size_t i;
- /* The subscription may be the last ref to this topic. Hold
- * the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
- ao2_cleanup);
- SCOPED_AO2LOCK(lock_topic, topic);
-
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
- /* swap [i] with last entry; remove last entry */
- topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* Unsubscribing unrefs the subscription */
- ao2_cleanup(sub);
- return NULL;
- }
- }
-
- ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
- }
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic,
+ ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+
+ if (!sub) {
+ return NULL;
+ }
+
+ /* While we don't know if the removal will be successful, we have to
+ * send the Unsubscribe first, so sub will get its final message.
+ * Since failure is a serious coding problem that shouldn't happen,
+ * shouldn't be a big deal.
+ */
+ send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
+
+ if (topic_remove_subscription(sub->topic, sub) != 0) {
+ ast_log(LOG_ERROR,
+ "Internal error: subscription has invalid topic\n");
+ return NULL;
+ }
+
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
return NULL;
}
@@ -392,8 +401,8 @@
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
@@ -435,18 +444,8 @@
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- struct stasis_subscription **subscribers;
+ size_t idx;
SCOPED_AO2LOCK(lock, topic);
-
- /* Increase list size, if needed */
- if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
- subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
- if (!subscribers) {
- return -1;
- }
- topic->subscribers = subscribers;
- topic->num_subscribers_max *= 2;
- }
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
@@ -454,8 +453,27 @@
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
- topic->subscribers[topic->num_subscribers_current++] = sub;
+ ast_vector_append(topic->subscribers, sub);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_add_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
return 0;
+}
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ size_t idx;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_remove_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
+ return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
@@ -533,8 +551,8 @@
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- struct stasis_subscription *sub = topic->subscribers[i];
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
@@ -565,34 +583,77 @@
stasis_forward_message(topic, topic, message);
}
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
-{
- struct stasis_topic *to_topic = data;
- stasis_forward_message(to_topic, topic, message);
-
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(to_topic);
- }
-}
-
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
-{
- struct stasis_subscription *sub;
+struct stasis_forward {
+ struct stasis_topic *from_topic;
+ struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
+{
+ struct stasis_forward *forward = obj;
+
+ ao2_cleanup(forward->from_topic);
+ ao2_cleanup(forward->to_topic);
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+ if (forward) {
+ int idx;
+
+ struct stasis_topic *from = forward->from_topic;
+ struct stasis_topic *to = forward->to_topic;
+
+ SCOPED_AO2LOCK(to_lock, to);
+
+ ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+ ao2_lock(from);
+ for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+ topic_remove_subscription(
+ from, ast_vector_get(to->subscribers, idx));
+ }
+ ao2_unlock(from);
+ }
+
+ return NULL;
+}
+
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic)
+{
+ struct stasis_forward *forward;
if (!from_topic || !to_topic) {
return NULL;
}
- /* Forwarding subscriptions should dispatch directly instead of having a
- * mailbox. Otherwise, messages forwarded to the same topic from
- * different topics may get reordered. Which is bad.
- */
- sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
- if (sub) {
- /* hold a ref to to_topic for this forwarding subscription */
- ao2_ref(to_topic, +1);
- }
- return sub;
+ forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ if (!forward) {
+ return NULL;
+ }
+
+ forward->from_topic = ao2_bump(from_topic);
+ forward->to_topic = ao2_bump(to_topic);
+
+ {
+ SCOPED_AO2LOCK(lock, to_topic);
+ int res;
+
+ res = ast_vector_append(to_topic->upstream_topics, from_topic);
+ if (res != 0) {
+ return NULL;
+ }
+
+ {
+ SCOPED_AO2LOCK(lock, from_topic);
+ size_t idx;
+ for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+ topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
+ }
+ }
+ }
+
+ return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
@@ -641,14 +702,14 @@
}
struct topic_pool_entry {
- struct stasis_subscription *forward;
+ struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
- entry->forward = stasis_unsubscribe(entry->forward);
+ entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
Modified: team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis_cache_pattern.c Sat Sep 21 10:13:36 2013
@@ -39,15 +39,15 @@
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
- struct stasis_subscription *forward_all_to_cached;
+ struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward_topic_to_all;
- struct stasis_subscription *forward_cached_to_all;
+ struct stasis_forward *forward_topic_to_all;
+ struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -60,7 +60,7 @@
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
- stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
@@ -172,9 +172,9 @@
return;
}
- stasis_unsubscribe(one->forward_topic_to_all);
+ stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
- stasis_unsubscribe(one->forward_cached_to_all);
+ stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
Modified: team/dlee/stasis-forward-optimization/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/res/stasis/app.c?view=diff&rev=399618&r1=399617&r2=399618
==============================================================================
--- team/dlee/stasis-forward-optimization/res/stasis/app.c (original)
+++ team/dlee/stasis-forward-optimization/res/stasis/app.c Sat Sep 21 10:13:36 2013
@@ -58,9 +58,9 @@
int interested;
/*! Forward for the regular topic */
- struct stasis_subscription *topic_forward;
+ struct stasis_forward *topic_forward;
/*! Forward for the caching topic */
- struct stasis_subscription *topic_cached_forward;
+ struct stasis_forward *topic_cached_forward;
/*! Unique id of the object being forwarded */
char id[];
@@ -78,9 +78,9 @@
static void forwards_unsubscribe(struct app_forwards *forwards)
{
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
- stasis_unsubscribe(forwards->topic_cached_forward);
+ stasis_forward_cancel(forwards->topic_cached_forward);
forwards->topic_cached_forward = NULL;
}
@@ -129,7 +129,7 @@
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
@@ -163,7 +163,7 @@
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
More information about the asterisk-commits
mailing list