[asterisk-commits] dlee: branch dlee/tp-local r399779 - /team/dlee/tp-local/main/stasis.c
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Sep 25 13:09:40 CDT 2013
Author: dlee
Date: Wed Sep 25 13:09:38 2013
New Revision: 399779
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399779
Log:
Stasis using tp-local
Modified:
team/dlee/tp-local/main/stasis.c
Modified: team/dlee/tp-local/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/main/stasis.c?view=diff&rev=399779&r1=399778&r2=399779
==============================================================================
--- team/dlee/tp-local/main/stasis.c (original)
+++ team/dlee/tp-local/main/stasis.c Wed Sep 25 13:09:38 2013
@@ -301,6 +301,9 @@
if (!sub->mailbox) {
return NULL;
}
+ ast_taskprocessor_set_local(sub->mailbox, sub);
+ /* Taskprocessor has a reference */
+ ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
@@ -327,6 +330,13 @@
return internal_stasis_subscribe(topic, callback, data, 1);
}
+static int sub_cleanup(void *data)
+{
+ struct stasis_subscription *sub = data;
+ ao2_cleanup(sub);
+ return 0;
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
/* The subscription may be the last ref to this topic. Hold
@@ -348,6 +358,11 @@
/* Now let everyone know about the unsubscribe */
send_subscription_unsubscribe(topic, sub);
+
+ /* When all that's done, remove the ref the mailbox has on the sub */
+ if (sub->mailbox) {
+ ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+ }
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
@@ -484,28 +499,24 @@
struct stasis_topic *topic;
/*! The message itself */
struct stasis_message *message;
- /*! Subscription receiving the message */
- struct stasis_subscription *sub;
};
-static void dispatch_dtor(struct dispatch *dispatch)
-{
+static void dispatch_dtor(void *obj)
+{
+ struct dispatch *dispatch = obj;
+
ao2_cleanup(dispatch->topic);
ao2_cleanup(dispatch->message);
- ao2_cleanup(dispatch->sub);
-
- ast_free(dispatch);
-}
-
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
+}
+
+static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message)
{
struct dispatch *dispatch;
ast_assert(topic != NULL);
ast_assert(message != NULL);
- ast_assert(sub != NULL);
-
- dispatch = ast_malloc(sizeof(*dispatch));
+
+ dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
if (!dispatch) {
return NULL;
}
@@ -515,9 +526,6 @@
dispatch->message = message;
ao2_ref(message, +1);
-
- dispatch->sub = sub;
- ao2_ref(sub, +1);
return dispatch;
}
@@ -527,34 +535,45 @@
* \param data \ref dispatch object
* \return 0
*/
-static int dispatch_exec(void *data)
-{
- struct dispatch *dispatch = data;
-
- subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
- dispatch_dtor(dispatch);
+static int dispatch_exec(struct ast_taskprocessor_local *local)
+{
+ struct stasis_subscription *sub = local->local_data;
+ struct dispatch *dispatch = local->data;
+
+ subscription_invoke(sub, dispatch->topic, dispatch->message);
+ ao2_cleanup(dispatch);
return 0;
}
static void dispatch_message(struct stasis_subscription *sub,
+ struct dispatch *dispatch)
+{
+ if (sub->mailbox) {
+ ao2_bump(dispatch);
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, dispatch) != 0) {
+ /* Push failed; ugh. */
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ ao2_cleanup(dispatch);
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, dispatch->topic, dispatch->message);
+ }
+}
+
+static void dispatch_single_message(struct stasis_subscription *sub,
struct stasis_topic *publisher_topic, struct stasis_message *message)
{
if (sub->mailbox) {
- struct dispatch *dispatch;
-
- dispatch = dispatch_create(publisher_topic, message, sub);
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+ dispatch = dispatch_create(publisher_topic, message);
if (!dispatch) {
ast_log(LOG_DEBUG, "Dropping dispatch\n");
return;
}
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
- /* Push failed; just delete the dispatch.
- */
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- dispatch_dtor(dispatch);
- }
+ dispatch_message(sub, dispatch);
} else {
/* Dispatch directly */
subscription_invoke(sub, publisher_topic, message);
@@ -568,18 +587,29 @@
* Make sure we hold onto a reference while dispatching. */
RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
ao2_cleanup);
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
+ if (ast_vector_size(topic->subscribers) == 0) {
+ return;
+ }
+
+ dispatch = dispatch_create(publisher_topic, message);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ return;
+ }
+
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- dispatch_message(sub, publisher_topic, message);
+ dispatch_message(sub, dispatch);
}
}
@@ -738,8 +768,7 @@
stasis_publish(topic, msg);
- /* Now we have to dispatch to the subscription itself */
- dispatch_message(sub, topic, msg);
+ dispatch_single_message(sub, topic, msg);
}
struct topic_pool_entry {
More information about the asterisk-commits
mailing list