[asterisk-commits] dlee: branch group/performance r399678 - in /team/group/performance: ./ apps/...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Sep 24 13:23:29 CDT 2013
Author: dlee
Date: Tue Sep 24 13:23:25 2013
New Revision: 399678
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399678
Log:
Merged ^/team/dlee/stasis-forward-optimization at 399664
Added:
team/group/performance/include/asterisk/vector.h
- copied unchanged from r399667, team/dlee/stasis-forward-optimization/include/asterisk/vector.h
Modified:
team/group/performance/ (props changed)
team/group/performance/apps/app_queue.c
team/group/performance/include/asterisk/stasis.h
team/group/performance/main/cdr.c
team/group/performance/main/cel.c
team/group/performance/main/channel_internal_api.c
team/group/performance/main/manager.c
team/group/performance/main/manager_bridges.c
team/group/performance/main/manager_channels.c
team/group/performance/main/manager_mwi.c
team/group/performance/main/manager_system.c
team/group/performance/main/stasis.c
team/group/performance/main/stasis_cache_pattern.c
team/group/performance/res/stasis/app.c
team/group/performance/tests/test_stasis.c
team/group/performance/tests/test_stasis_endpoints.c
Propchange: team/group/performance/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Sep 24 13:23:25 2013
@@ -1,1 +1,1 @@
-/team/dlee/performance:1-399659 /team/dlee/taskprocessor-optimization:1-399654
+/team/dlee/performance:1-399659 /team/dlee/taskprocessor-optimization:1-399654 /team/dlee/stasis-forward-optimization:1-399664
Modified: team/group/performance/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/apps/app_queue.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/apps/app_queue.c (original)
+++ team/group/performance/apps/app_queue.c Tue Sep 24 13:23:25 2013
@@ -10327,7 +10327,7 @@
};
static struct stasis_message_router *agent_router;
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static int unload_module(void)
{
@@ -10355,7 +10355,7 @@
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
stasis_message_router_unsubscribe_and_join(agent_router);
- topic_forwarder = stasis_unsubscribe(topic_forwarder);
+ topic_forwarder = stasis_forward_cancel(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);
Modified: team/group/performance/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/include/asterisk/stasis.h?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/include/asterisk/stasis.h (original)
+++ team/group/performance/include/asterisk/stasis.h Tue Sep 24 13:23:25 2013
@@ -464,6 +464,8 @@
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
+struct stasis_forward;
+
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
@@ -477,8 +479,10 @@
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
/*!
* \brief Get the unique ID for the subscription.
Modified: team/group/performance/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/cdr.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/cdr.c (original)
+++ team/group/performance/main/cdr.c Tue Sep 24 13:23:25 2013
@@ -334,13 +334,13 @@
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
-static struct stasis_subscription *bridge_subscription;
+static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
-static struct stasis_subscription *channel_subscription;
+static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
-static struct stasis_subscription *parking_subscription;
+static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
@@ -3965,9 +3965,9 @@
static void cdr_engine_cleanup(void)
{
- channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
- bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
- parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
+ channel_subscription = stasis_forward_cancel(channel_subscription);
+ bridge_subscription = stasis_forward_cancel(bridge_subscription);
+ parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;
Modified: team/group/performance/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/cel.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/cel.c (original)
+++ team/group/performance/main/cel.c Tue Sep 24 13:23:25 2013
@@ -121,16 +121,16 @@
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_channel_forwarder;
+static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
-static struct stasis_subscription *cel_bridge_forwarder;
+static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
-static struct stasis_subscription *cel_parking_forwarder;
+static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
-static struct stasis_subscription *cel_cel_forwarder;
+static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@@ -1343,10 +1343,10 @@
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
- cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
- cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
- cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
- cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
+ cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
+ cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
+ cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
+ cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;
Modified: team/group/performance/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/channel_internal_api.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/channel_internal_api.c (original)
+++ team/group/performance/main/channel_internal_api.c Tue Sep 24 13:23:25 2013
@@ -207,8 +207,7 @@
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
- struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
- struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
+ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
@@ -1429,8 +1428,7 @@
ast_string_field_free_memory(chan);
- chan->forwarder = stasis_unsubscribe(chan->forwarder);
- chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
+ chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;
Modified: team/group/performance/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager.c (original)
+++ team/group/performance/main/manager.c Tue Sep 24 13:23:25 2013
@@ -1126,7 +1126,7 @@
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
-static struct stasis_subscription *rtp_topic_forwarder;
+static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
@@ -7755,7 +7755,7 @@
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
- stasis_unsubscribe_and_join(rtp_topic_forwarder);
+ stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
Modified: team/group/performance/main/manager_bridges.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_bridges.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_bridges.c (original)
+++ team/group/performance/main/manager_bridges.c Tue Sep 24 13:23:25 2013
@@ -106,7 +106,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
@@ -456,7 +456,7 @@
static void manager_bridging_cleanup(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/group/performance/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_channels.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_channels.c (original)
+++ team/group/performance/main/manager_channels.c Tue Sep 24 13:23:25 2013
@@ -370,7 +370,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
@@ -1100,7 +1100,7 @@
static void manager_channels_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/group/performance/main/manager_mwi.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_mwi.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_mwi.c (original)
+++ team/group/performance/main/manager_mwi.c Tue Sep 24 13:23:25 2013
@@ -41,7 +41,7 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
@@ -149,7 +149,7 @@
static void manager_mwi_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/group/performance/main/manager_system.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/manager_system.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/manager_system.c (original)
+++ team/group/performance/main/manager_system.c Tue Sep 24 13:23:25 2013
@@ -34,11 +34,11 @@
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
-static struct stasis_subscription *topic_forwarder;
+static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
- stasis_unsubscribe(topic_forwarder);
+ stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}
Modified: team/group/performance/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/stasis.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/stasis.c (original)
+++ team/group/performance/main/stasis.c Tue Sep 24 13:23:25 2013
@@ -29,7 +29,7 @@
#include "asterisk.h"
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
@@ -37,6 +37,7 @@
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
@@ -139,15 +140,17 @@
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
- struct stasis_subscription **subscribers;
- /*! Allocated length of the subscribers array */
- size_t num_subscribers_max;
- /*! Current size of the subscribers array */
- size_t num_subscribers_current;
+ ast_vector(struct stasis_subscription *) subscribers;
+
+ /*! Topics forwarding into this topic */
+ ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
-static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
+static int topic_add_subscription(struct stasis_topic *topic,
+ struct stasis_subscription *sub);
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
@@ -155,16 +158,18 @@
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
- ast_assert(topic->num_subscribers_current == 0);
+ ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
- ast_free(topic->subscribers);
- topic->subscribers = NULL;
+
+ ast_vector_free(topic->subscribers);
+ ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
@@ -177,9 +182,10 @@
return NULL;
}
- topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
- topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
- if (!topic->subscribers) {
+ res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+ res |= ast_vector_init(topic->upstream_topics, 0);
+
+ if (res != 0) {
return NULL;
}
@@ -264,7 +270,8 @@
}
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
@@ -306,7 +313,7 @@
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
- send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+ send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
@@ -322,27 +329,28 @@
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
- if (sub) {
- size_t i;
- /* The subscription may be the last ref to this topic. Hold
- * the topic ref open until after the unlock. */
- RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
- ao2_cleanup);
- SCOPED_AO2LOCK(lock_topic, topic);
-
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
- /* swap [i] with last entry; remove last entry */
- topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* Unsubscribing unrefs the subscription */
- ao2_cleanup(sub);
- return NULL;
- }
- }
-
- ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
- }
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic,
+ ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
+
+ if (!sub) {
+ return NULL;
+ }
+
+ /* We have to remove the subscription first, to ensure the unsubscribe
+ * is the final message */
+ if (topic_remove_subscription(sub->topic, sub) != 0) {
+ ast_log(LOG_ERROR,
+ "Internal error: subscription has invalid topic\n");
+ return NULL;
+ }
+
+ /* Now let everyone know about the unsubscribe */
+ send_subscription_unsubscribe(topic, sub);
+
+ /* Unsubscribing unrefs the subscription */
+ ao2_cleanup(sub);
return NULL;
}
@@ -392,8 +400,8 @@
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- if (topic->subscribers[i] == sub) {
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
@@ -435,18 +443,8 @@
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
- struct stasis_subscription **subscribers;
+ size_t idx;
SCOPED_AO2LOCK(lock, topic);
-
- /* Increase list size, if needed */
- if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
- subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
- if (!subscribers) {
- return -1;
- }
- topic->subscribers = subscribers;
- topic->num_subscribers_max *= 2;
- }
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
@@ -454,8 +452,27 @@
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
- topic->subscribers[topic->num_subscribers_current++] = sub;
+ ast_vector_append(topic->subscribers, sub);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_add_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
return 0;
+}
+
+static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
+{
+ size_t idx;
+ SCOPED_AO2LOCK(lock_topic, topic);
+
+ for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+ topic_remove_subscription(
+ ast_vector_get(topic->upstream_topics, idx), sub);
+ }
+
+ return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
@@ -520,6 +537,30 @@
return 0;
}
+static void dispatch_message(struct stasis_subscription *sub,
+ struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ if (sub->mailbox) {
+ struct dispatch *dispatch;
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ return;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+ /* Push failed; just delete the dispatch.
+ */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ dispatch_dtor(dispatch);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, publisher_topic, message);
+ }
+}
+
void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
size_t i;
@@ -533,66 +574,97 @@
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
- for (i = 0; i < topic->num_subscribers_current; ++i) {
- struct stasis_subscription *sub = topic->subscribers[i];
+ for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
+ struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- if (sub->mailbox) {
- struct dispatch *dispatch;
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
+ dispatch_message(sub, publisher_topic, message);
+
+ }
+}
+
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ stasis_forward_message(topic, topic, message);
+}
+
+struct stasis_forward {
+ struct stasis_topic *from_topic;
+ struct stasis_topic *to_topic;
+};
+
+static void forward_dtor(void *obj)
+{
+ struct stasis_forward *forward = obj;
+
+ ao2_cleanup(forward->from_topic);
+ forward->from_topic = NULL;
+ ao2_cleanup(forward->to_topic);
+ forward->to_topic = NULL;
+}
+
+struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
+{
+ if (forward) {
+ int idx;
+
+ struct stasis_topic *from = forward->from_topic;
+ struct stasis_topic *to = forward->to_topic;
+
+ SCOPED_AO2LOCK(to_lock, to);
+
+ ast_vector_remove_elem_unordered(to->upstream_topics, from);
+
+ ao2_lock(from);
+ for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
+ topic_remove_subscription(
+ from, ast_vector_get(to->subscribers, idx));
+ }
+ ao2_unlock(from);
+ }
+
+ ao2_cleanup(forward);
+
+ return NULL;
+}
+
+struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic)
+{
+ RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
+
+ if (!from_topic || !to_topic) {
+ return NULL;
+ }
+
+ forward = ao2_alloc(sizeof(*forward), forward_dtor);
+ if (!forward) {
+ return NULL;
+ }
+
+ forward->from_topic = ao2_bump(from_topic);
+ forward->to_topic = ao2_bump(to_topic);
+
+ {
+ SCOPED_AO2LOCK(lock, to_topic);
+ int res;
+
+ res = ast_vector_append(to_topic->upstream_topics, from_topic);
+ if (res != 0) {
+ return NULL;
+ }
+
+ {
+ SCOPED_AO2LOCK(lock, from_topic);
+ size_t idx;
+ for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
+ topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
}
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
- /* Push failed; just delete the dispatch.
- */
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- dispatch_dtor(dispatch);
- }
- } else {
- /* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
}
}
-}
-
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
-{
- stasis_forward_message(topic, topic, message);
-}
-
-/*! \brief Forwarding subscriber */
-static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
-{
- struct stasis_topic *to_topic = data;
- stasis_forward_message(to_topic, topic, message);
-
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(to_topic);
- }
-}
-
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
-{
- struct stasis_subscription *sub;
- if (!from_topic || !to_topic) {
- return NULL;
- }
-
- /* Forwarding subscriptions should dispatch directly instead of having a
- * mailbox. Otherwise, messages forwarded to the same topic from
- * different topics may get reordered. Which is bad.
- */
- sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
- if (sub) {
- /* hold a ref to to_topic for this forwarding subscription */
- ao2_ref(to_topic, +1);
- }
- return sub;
+
+ return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
@@ -602,7 +674,7 @@
ao2_cleanup(change->topic);
}
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
@@ -620,12 +692,15 @@
return change;
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- change = subscription_change_alloc(topic, uniqueid, description);
+ /* This assumes that we have already unsubscribed */
+ ast_assert(stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
@@ -640,15 +715,42 @@
stasis_publish(topic, msg);
}
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+ struct stasis_subscription *sub)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ /* This assumes that we have already unsubscribed */
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+
+ /* Now we have to dispatch to the subscription itself */
+ dispatch_message(sub, topic, msg);
+}
+
struct topic_pool_entry {
- struct stasis_subscription *forward;
+ struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
- entry->forward = stasis_unsubscribe(entry->forward);
+ entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
Modified: team/group/performance/main/stasis_cache_pattern.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/stasis_cache_pattern.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/main/stasis_cache_pattern.c (original)
+++ team/group/performance/main/stasis_cache_pattern.c Tue Sep 24 13:23:25 2013
@@ -39,15 +39,15 @@
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
- struct stasis_subscription *forward_all_to_cached;
+ struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward_topic_to_all;
- struct stasis_subscription *forward_cached_to_all;
+ struct stasis_forward *forward_topic_to_all;
+ struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -60,7 +60,7 @@
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
- stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
@@ -172,9 +172,9 @@
return;
}
- stasis_unsubscribe(one->forward_topic_to_all);
+ stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
- stasis_unsubscribe(one->forward_cached_to_all);
+ stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;
Modified: team/group/performance/res/stasis/app.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/res/stasis/app.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/res/stasis/app.c (original)
+++ team/group/performance/res/stasis/app.c Tue Sep 24 13:23:25 2013
@@ -58,9 +58,9 @@
int interested;
/*! Forward for the regular topic */
- struct stasis_subscription *topic_forward;
+ struct stasis_forward *topic_forward;
/*! Forward for the caching topic */
- struct stasis_subscription *topic_cached_forward;
+ struct stasis_forward *topic_cached_forward;
/*! Unique id of the object being forwarded */
char id[];
@@ -78,9 +78,9 @@
static void forwards_unsubscribe(struct app_forwards *forwards)
{
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
- stasis_unsubscribe(forwards->topic_cached_forward);
+ stasis_forward_cancel(forwards->topic_cached_forward);
forwards->topic_cached_forward = NULL;
}
@@ -129,7 +129,7 @@
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
@@ -163,7 +163,7 @@
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
- stasis_unsubscribe(forwards->topic_forward);
+ stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
Modified: team/group/performance/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/tests/test_stasis.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/tests/test_stasis.c (original)
+++ team/group/performance/tests/test_stasis.c Tue Sep 24 13:23:25 2013
@@ -427,7 +427,7 @@
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@@ -499,8 +499,8 @@
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
- RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
Modified: team/group/performance/tests/test_stasis_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/tests/test_stasis_endpoints.c?view=diff&rev=399678&r1=399677&r2=399678
==============================================================================
--- team/group/performance/tests/test_stasis_endpoints.c (original)
+++ team/group/performance/tests/test_stasis_endpoints.c Tue Sep 24 13:23:25 2013
@@ -264,11 +264,14 @@
type = stasis_message_type(msg);
ast_test_validate(test, ast_channel_snapshot_type() == type);
+ /* The ordering of the cache clear and endpoint snapshot are
+ * unspecified */
msg = sink->messages[3];
- type = stasis_message_type(msg);
- ast_test_validate(test, stasis_cache_clear_type() == type);
-
- msg = sink->messages[4];
+ if (stasis_message_type(msg) == stasis_cache_clear_type()) {
+ /* Okay; the next message should be the endpoint snapshot */
+ msg = sink->messages[4];
+ }
+
type = stasis_message_type(msg);
ast_test_validate(test, ast_endpoint_snapshot_type() == type);
actual_snapshot = stasis_message_data(msg);
More information about the asterisk-commits
mailing list