[svn-commits] dlee: branch dlee/stasis-forward-optimization r399662 - /team/dlee/stasis-for...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Tue Sep 24 11:09:47 CDT 2013
Author: dlee
Date: Tue Sep 24 11:09:45 2013
New Revision: 399662
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399662
Log:
Attempt at fixing unsubscribe delivery
Modified:
team/dlee/stasis-forward-optimization/main/stasis.c
Modified: team/dlee/stasis-forward-optimization/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-forward-optimization/main/stasis.c?view=diff&rev=399662&r1=399661&r2=399662
==============================================================================
--- team/dlee/stasis-forward-optimization/main/stasis.c (original)
+++ team/dlee/stasis-forward-optimization/main/stasis.c Tue Sep 24 11:09:45 2013
@@ -270,7 +270,8 @@
}
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
+static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
@@ -312,7 +313,7 @@
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
- send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
+ send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
@@ -337,18 +338,16 @@
return NULL;
}
- /* While we don't know if the removal will be successful, we have to
- * send the Unsubscribe first, so sub will get its final message.
- * Since failure is a serious coding problem that shouldn't happen,
- * shouldn't be a big deal.
- */
- send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
-
+ /* We have to remove the subscription first, to ensure the unsubscribe
+ * is the final message */
if (topic_remove_subscription(sub->topic, sub) != 0) {
ast_log(LOG_ERROR,
"Internal error: subscription has invalid topic\n");
return NULL;
}
+
+ /* Now let everyone know about the unsubscribe */
+ send_subscription_unsubscribe(topic, sub);
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
@@ -538,6 +537,30 @@
return 0;
}
+static void dispatch_message(struct stasis_subscription *sub,
+ struct stasis_topic *publisher_topic, struct stasis_message *message)
+{
+ if (sub->mailbox) {
+ struct dispatch *dispatch;
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ return;
+ }
+
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
+ /* Push failed; just delete the dispatch.
+ */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ dispatch_dtor(dispatch);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, publisher_topic, message);
+ }
+}
+
void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
size_t i;
@@ -556,25 +579,8 @@
ast_assert(sub != NULL);
- if (sub->mailbox) {
- struct dispatch *dispatch;
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
- /* Push failed; just delete the dispatch.
- */
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- dispatch_dtor(dispatch);
- }
- } else {
- /* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
- }
+ dispatch_message(sub, publisher_topic, message);
+
}
}
@@ -668,7 +674,7 @@
ao2_cleanup(change->topic);
}
-static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
+static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
@@ -686,12 +692,15 @@
return change;
}
-static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
+static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- change = subscription_change_alloc(topic, uniqueid, description);
+ /* This assumes that we have already unsubscribed */
+ ast_assert(stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
@@ -704,6 +713,33 @@
}
stasis_publish(topic, msg);
+}
+
+static void send_subscription_unsubscribe(struct stasis_topic *topic,
+ struct stasis_subscription *sub)
+{
+ RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+ /* This assumes that we have already unsubscribed */
+ ast_assert(!stasis_subscription_is_subscribed(sub));
+
+ change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
+
+ if (!change) {
+ return;
+ }
+
+ msg = stasis_message_create(stasis_subscription_change_type(), change);
+
+ if (!msg) {
+ return;
+ }
+
+ stasis_publish(topic, msg);
+
+ /* Now we have to dispatch to the subscription itself */
+ dispatch_message(sub, topic, msg);
}
struct topic_pool_entry {
More information about the svn-commits
mailing list