[asterisk-commits] kmoore: branch kmoore/stasis-mwi r382439 - in /team/kmoore/stasis-mwi: includ...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Mon Mar 4 23:40:56 CST 2013
Author: kmoore
Date: Mon Mar 4 23:40:52 2013
New Revision: 382439
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=382439
Log:
Initial attempt at mitigating the race condition
Modified:
team/kmoore/stasis-mwi/include/asterisk/stasis.h
team/kmoore/stasis-mwi/main/app.c
team/kmoore/stasis-mwi/main/channel.c
team/kmoore/stasis-mwi/main/stasis.c
team/kmoore/stasis-mwi/main/stasis_cache.c
team/kmoore/stasis-mwi/tests/test_stasis.c
Modified: team/kmoore/stasis-mwi/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/include/asterisk/stasis.h?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/include/asterisk/stasis.h (original)
+++ team/kmoore/stasis-mwi/include/asterisk/stasis.h Mon Mar 4 23:40:52 2013
@@ -343,6 +343,15 @@
* \since 12
*/
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+
+/*!
+ * \brief Get the unique ID for the subscription.
+ *
+ * \param sub Subscription for which to get the unique ID.
+ * \return Unique ID for the subscription.
+ * \since 12
+ */
+const char *stasis_subscription_uniqueid(struct stasis_subscription *sub);
/*!
* \brief Holds details about changes to subscriptions for the specified topic
@@ -428,11 +437,12 @@
*
* \param original_topic Topic publishing snapshot messages.
* \param id_fn Callback to extract the id from a snapshot message.
+ * \param[out] Subscription produced by the caching backend held for the life of the cache concurrently with the returned stasis_caching_topic.
* \return New topic which changes snapshot messages to stasis_cache_update()
* messages, and forwards all other messages from the original topic.
* \since 12
*/
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub);
/*!
* \brief Returns the topic of cached events from a caching topics.
Modified: team/kmoore/stasis-mwi/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/app.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/app.c (original)
+++ team/kmoore/stasis-mwi/main/app.c Mon Mar 4 23:40:52 2013
@@ -82,6 +82,7 @@
static struct stasis_topic *__mwi_topic_all;
static struct stasis_caching_topic *__mwi_topic_cached;
+static struct stasis_subscription *__mwi_cache_sub;
static struct stasis_message_type *__mwi_message_type;
static struct ao2_container *__mwi_topics;
@@ -2784,6 +2785,8 @@
__mwi_message_type = NULL;
ao2_cleanup(__mwi_topics);
__mwi_topics = NULL;
+ stasis_unsubscribe(__mwi_cache_sub);
+ __mwi_cache_sub = NULL;
}
#define MWI_TOPIC_BUCKETS 57
@@ -2791,7 +2794,7 @@
int app_init(void)
{
__mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
- __mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id);
+ __mwi_topic_cached = stasis_caching_topic_create(__mwi_topic_all, mwi_state_get_id, &__mwi_cache_sub);
__mwi_message_type = stasis_message_type_create("stasis_mwi_state");
__mwi_topics = ao2_container_alloc(MWI_TOPIC_BUCKETS, mwi_topic_hash, mwi_topic_cmp);
ast_register_atexit(app_exit);
Modified: team/kmoore/stasis-mwi/main/channel.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/channel.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/channel.c (original)
+++ team/kmoore/stasis-mwi/main/channel.c Mon Mar 4 23:40:52 2013
@@ -153,13 +153,15 @@
static struct ao2_container *channels;
/*! \brief Message type for channel snapshot events */
-static struct stasis_message_type *__ast_channel_snapshot;
-
-static struct stasis_message_type *__ast_channel_varset_event;
-
-struct stasis_topic *__ast_channel_events_all;
-
-struct stasis_caching_topic *__ast_channel_events_all_cached;
+static struct stasis_message_type *__channel_snapshot;
+
+static struct stasis_message_type *__channel_varset_event;
+
+struct stasis_topic *__channel_events_all;
+
+struct stasis_caching_topic *__channel_events_all_cached;
+
+struct stasis_subscription *__channel_cache_sub;
/*! \brief map AST_CAUSE's to readable string representations
*
@@ -8629,12 +8631,12 @@
static void channels_shutdown(void)
{
- ao2_cleanup(__ast_channel_snapshot);
- __ast_channel_snapshot = NULL;
- ao2_cleanup(__ast_channel_events_all);
- __ast_channel_events_all = NULL;
- ao2_cleanup(__ast_channel_events_all_cached);
- __ast_channel_events_all_cached = NULL;
+ ao2_cleanup(__channel_snapshot);
+ __channel_snapshot = NULL;
+ ao2_cleanup(__channel_events_all);
+ __channel_events_all = NULL;
+ ao2_cleanup(__channel_events_all_cached);
+ __channel_events_all_cached = NULL;
ast_data_unregister(NULL);
ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
if (channels) {
@@ -8662,11 +8664,11 @@
ao2_container_register("channels", channels, prnt_channel_key);
}
- __ast_channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
- __ast_channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
-
- __ast_channel_events_all = stasis_topic_create("ast_channel_events_all");
- __ast_channel_events_all_cached = stasis_caching_topic_create(__ast_channel_events_all, channel_snapshot_get_id);
+ __channel_snapshot = stasis_message_type_create("ast_channel_snapshot");
+ __channel_varset_event = stasis_message_type_create("ast_channel_varset_event");
+
+ __channel_events_all = stasis_topic_create("ast_channel_events_all");
+ __channel_events_all_cached = stasis_caching_topic_create(__channel_events_all, channel_snapshot_get_id, &__channel_cache_sub);
ast_cli_register_multiple(cli_channel, ARRAY_LEN(cli_channel));
@@ -11304,22 +11306,22 @@
struct stasis_message_type *ast_channel_varset_event(void)
{
- return __ast_channel_varset_event;
+ return __channel_varset_event;
}
struct stasis_message_type *ast_channel_snapshot(void)
{
- return __ast_channel_snapshot;
+ return __channel_snapshot;
}
struct stasis_topic *ast_channel_events_all(void)
{
- return __ast_channel_events_all;
+ return __channel_events_all;
}
struct stasis_caching_topic *ast_channel_events_all_cached(void)
{
- return __ast_channel_events_all_cached;
+ return __channel_events_all_cached;
}
/* DO NOT PUT ADDITIONAL FUNCTIONS BELOW THIS BOUNDARY
Modified: team/kmoore/stasis-mwi/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis.c (original)
+++ team/kmoore/stasis-mwi/main/stasis.c Mon Mar 4 23:40:52 2013
@@ -66,9 +66,6 @@
struct stasis_topic *topic = obj;
ast_free(topic->name);
topic->name = NULL;
- while (topic->num_subscribers_current > 0) {
- stasis_unsubscribe(topic->subscribers[0]);
- }
ast_free(topic->subscribers);
topic->subscribers = NULL;
}
@@ -106,8 +103,8 @@
/*! \private */
struct stasis_subscription {
/*! Unique ID for this subscription */
- char *id;
- /*! Native pointer to the topic. AO2 ref could cause a cycle. */
+ char *uniqueid;
+ /*! Topic subscribed to. */
struct stasis_topic *topic;
/*! Mailbox for processing incoming messages. */
struct ast_taskprocessor *mailbox;
@@ -120,8 +117,6 @@
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
- /* This should never be called until after we've been unsubscribed,
- * which means that topic will have been reffed for the orphaned sub */
ao2_cleanup(sub->topic);
ao2_cleanup(sub->mailbox);
sub->mailbox = NULL;
@@ -153,8 +148,9 @@
}
}
- sub->id = ast_strdup(stupid_name);
- sub->topic = topic; /* Don't increase the refcount, or it will cause a cyclic ref! */
+ sub->uniqueid = ast_strdup(stupid_name);
+ ao2_ref(topic, +1);
+ sub->topic = topic;
sub->callback = callback;
sub->data = data;
@@ -177,6 +173,7 @@
if (sub) {
RAII_VAR(struct stasis_subscription *, cleanup_after_unlock, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, sub);
+ cleanup_after_unlock = sub;
if (sub->topic) {
size_t i;
@@ -185,14 +182,9 @@
for (i = 0; i < topic->num_subscribers_current; ++i) {
if (topic->subscribers[i] == sub) {
- send_subscription_change_message(topic, sub->id, "Unsubscribe", 1);
+ send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe", 1);
/* swap [i] with last entry; remove last entry */
topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
- /* We can't clean up now, since the lock is held. defer to RAII */
- cleanup_after_unlock = sub;
- /* Now that the topic will no longer have a ref on the sub,
- * the sub can have a ref on the topic without creating a cyclic ref */
- ao2_ref(sub->topic, +1);
return;
}
}
@@ -200,6 +192,11 @@
ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
}
}
+}
+
+const char *stasis_subscription_uniqueid(struct stasis_subscription *sub)
+{
+ return sub->uniqueid;
}
/*!
@@ -224,7 +221,7 @@
topic->num_subscribers_max *= 2;
}
- ao2_ref(sub, +1);
+ /* Don't ref sub here or we'll cause a reference cycle. */
topic->subscribers[topic->num_subscribers_current++] = sub;
return 0;
}
@@ -433,14 +430,14 @@
return;
}
- ao2_ref(change, +1);
- ao2_ref(msg, +1);
__forward_message_full(topic, topic, msg, topic_locked);
}
/*! \brief Cleanup function */
static void stasis_exit(void)
{
+ ao2_cleanup(__subscription_change_message_type);
+ __subscription_change_message_type = NULL;
ast_threadpool_shutdown(pool);
pool = NULL;
}
Modified: team/kmoore/stasis-mwi/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/main/stasis_cache.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/main/stasis_cache.c (original)
+++ team/kmoore/stasis-mwi/main/stasis_cache.c Mon Mar 4 23:40:52 2013
@@ -45,6 +45,7 @@
struct stasis_caching_topic {
struct ao2_container *cache;
struct stasis_topic *topic;
+ char *uniqueid;
snapshot_get_id id_fn;
};
@@ -54,6 +55,8 @@
caching_topic->cache = NULL;
ao2_cleanup(caching_topic->topic);
caching_topic->topic = NULL;
+ ast_free(caching_topic->uniqueid);
+ caching_topic->uniqueid = NULL;
}
struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
@@ -339,11 +342,20 @@
static void caching_topic_exec(void *data, struct stasis_topic *topic, struct stasis_message *message)
{
+ RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
const char *id = NULL;
ast_assert(caching_topic->topic != NULL);
ast_assert(caching_topic->id_fn != NULL);
+
+ if (stasis_subscription_change() == stasis_message_type(message)) {
+ struct stasis_subscription_change *change = stasis_message_data(message);
+ /* If this is the last message this caching topic will ever receive */
+ if (!strcmp("Unsubscribe", change->description) && !strcmp(change->uniqueid, caching_topic->uniqueid)) {
+ caching_topic_needs_unref = caching_topic;
+ }
+ }
/* Handle cache clear event */
if (cache_clear_data() == stasis_message_type(message)) {
@@ -384,13 +396,13 @@
}
}
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn, struct stasis_subscription **sub)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
- struct stasis_subscription *sub;
RAII_VAR(char *, new_name, NULL, free);
int ret;
+ ast_assert(sub != NULL);
ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
if (ret < 0) {
return NULL;
@@ -414,12 +426,20 @@
caching_topic->id_fn = id_fn;
- sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
- if (sub == NULL) {
- return NULL;
- }
-
- ao2_ref(caching_topic, +1);
+ *sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
+ if (*sub == NULL) {
+ return NULL;
+ }
+
+ caching_topic->uniqueid = ast_strdup(stasis_subscription_uniqueid(*sub));
+ if (caching_topic->uniqueid == NULL) {
+ return NULL;
+ }
+
+ /* Bump this one extra because of the reference living in the
+ * subscription that will be cleared by the callback upon the
+ * subscription being cancelled */
+ ao2_ref(caching_topic, +2);
return caching_topic;
}
Modified: team/kmoore/stasis-mwi/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-mwi/tests/test_stasis.c?view=diff&rev=382439&r1=382438&r2=382439
==============================================================================
--- team/kmoore/stasis-mwi/tests/test_stasis.c (original)
+++ team/kmoore/stasis-mwi/tests/test_stasis.c Mon Mar 4 23:40:52 2013
@@ -254,7 +254,7 @@
AST_TEST_DEFINE(subscribe)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, uut, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
@@ -304,10 +304,10 @@
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, uut, NULL, ao2_cleanup);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ struct stasis_subscription *uut = NULL;
int actual_len;
switch (cmd) {
@@ -360,9 +360,9 @@
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, forward_sub, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, parent_sub, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
@@ -473,10 +473,11 @@
RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
int actual_len;
struct stasis_message_type *actual_type;
@@ -505,7 +506,7 @@
actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
- caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create();
ast_test_validate(test, NULL != consumer);
@@ -544,10 +545,11 @@
RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer_non_caching, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub_non_caching, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
@@ -581,7 +583,7 @@
actual_len = consumer_wait_for(consumer_non_caching, 1, SUBSCRIPTION);
ast_test_validate(test, 1 == consumer_non_caching->subs_rxed);
- caching_topic = stasis_caching_topic_create(topic, cache_test_data_id);
+ caching_topic = stasis_caching_topic_create(topic, cache_test_data_id, &cache_sub);
ast_test_validate(test, NULL != caching_topic);
consumer = consumer_create();
ast_test_validate(test, NULL != consumer);
More information about the asterisk-commits
mailing list