[asterisk-commits] dlee: trunk r395954 - in /trunk: apps/ apps/confbridge/ channels/ include/ast...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Aug 1 08:49:39 CDT 2013


Author: dlee
Date: Thu Aug  1 08:49:34 2013
New Revision: 395954

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=395954
Log:
Split caching out from the stasis_caching_topic.

In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.

To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.

In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:

     single_topic  ---------------->  all_topic
           ^
           |
     single_topic_cached  ----+---->  all_topic_cached
                              |
                              +---->  cache

This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.

Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.

(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/

Added:
    trunk/include/asterisk/stasis_cache_pattern.h
      - copied unchanged from r395953, team/dlee/stasis-cache-split/include/asterisk/stasis_cache_pattern.h
    trunk/main/stasis_cache_pattern.c
      - copied unchanged from r395953, team/dlee/stasis-cache-split/main/stasis_cache_pattern.c
    trunk/main/stasis_wait.c
      - copied unchanged from r395953, team/dlee/stasis-cache-split/main/stasis_wait.c
Modified:
    trunk/apps/app_meetme.c
    trunk/apps/app_voicemail.c
    trunk/apps/confbridge/confbridge_manager.c
    trunk/channels/chan_dahdi.c
    trunk/channels/chan_iax2.c
    trunk/channels/chan_mgcp.c
    trunk/channels/chan_sip.c
    trunk/channels/chan_unistim.c
    trunk/channels/sig_pri.c
    trunk/include/asterisk/app.h
    trunk/include/asterisk/bridge.h
    trunk/include/asterisk/channel.h
    trunk/include/asterisk/channel_internal.h
    trunk/include/asterisk/devicestate.h
    trunk/include/asterisk/presencestate.h
    trunk/include/asterisk/stasis.h
    trunk/include/asterisk/stasis_bridges.h
    trunk/include/asterisk/stasis_channels.h
    trunk/include/asterisk/stasis_endpoints.h
    trunk/main/app.c
    trunk/main/bridge.c
    trunk/main/cdr.c
    trunk/main/cel.c
    trunk/main/channel_internal_api.c
    trunk/main/cli.c
    trunk/main/devicestate.c
    trunk/main/endpoints.c
    trunk/main/manager.c
    trunk/main/manager_bridges.c
    trunk/main/manager_channels.c
    trunk/main/manager_endpoints.c
    trunk/main/pbx.c
    trunk/main/presencestate.c
    trunk/main/stasis.c
    trunk/main/stasis_bridges.c
    trunk/main/stasis_cache.c
    trunk/main/stasis_channels.c
    trunk/main/stasis_endpoints.c
    trunk/res/ari/resource_bridges.c
    trunk/res/ari/resource_channels.c
    trunk/res/ari/resource_endpoints.c
    trunk/res/res_agi.c
    trunk/res/res_chan_stats.c
    trunk/res/res_jabber.c
    trunk/res/res_stasis.c
    trunk/res/res_xmpp.c
    trunk/res/stasis/control.c
    trunk/tests/test_cel.c
    trunk/tests/test_devicestate.c
    trunk/tests/test_stasis.c
    trunk/tests/test_stasis_endpoints.c

Modified: trunk/apps/app_meetme.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_meetme.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/app_meetme.c (original)
+++ trunk/apps/app_meetme.c Thu Aug  1 08:49:34 2013
@@ -1167,7 +1167,7 @@
 	STASIS_MESSAGE_TYPE_INIT(meetme_talk_request_type);
 
 	meetme_event_message_router = stasis_message_router_create(
-		stasis_caching_get_topic(ast_channel_topic_all_cached()));
+		ast_channel_cache());
 
 	if (!meetme_event_message_router) {
 		meetme_stasis_cleanup();

Modified: trunk/apps/app_voicemail.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_voicemail.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/app_voicemail.c (original)
+++ trunk/apps/app_voicemail.c Thu Aug  1 08:49:34 2013
@@ -12640,7 +12640,7 @@
 	mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL);
 
 	if (mwi_sub_sub) {
-		struct ao2_container *cached = stasis_cache_dump(ast_mwi_topic_cached(), stasis_subscription_change_type());
+		struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type());
 		if (cached) {
 			ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
 		}

Modified: trunk/apps/confbridge/confbridge_manager.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/confbridge/confbridge_manager.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/confbridge/confbridge_manager.c (original)
+++ trunk/apps/confbridge/confbridge_manager.c Thu Aug  1 08:49:34 2013
@@ -344,7 +344,7 @@
 	STASIS_MESSAGE_TYPE_INIT(confbridge_talking_type);
 
 	bridge_state_router = stasis_message_router_create(
-		stasis_caching_get_topic(ast_bridge_topic_all_cached()));
+		ast_bridge_topic_all_cached());
 
 	if (!bridge_state_router) {
 		return -1;
@@ -415,7 +415,7 @@
 	}
 
 	channel_state_router = stasis_message_router_create(
-		stasis_caching_get_topic(ast_channel_topic_all_cached()));
+		ast_channel_topic_all_cached());
 
 	if (!channel_state_router) {
 		manager_confbridge_shutdown();

Modified: trunk/channels/chan_dahdi.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_dahdi.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_dahdi.c (original)
+++ trunk/channels/chan_dahdi.c Thu Aug  1 08:49:34 2013
@@ -4822,7 +4822,7 @@
 	}
 
 	ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
-	mwi_message = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+	mwi_message = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 
 	if (mwi_message) {
 		struct ast_mwi_state *mwi_state = stasis_message_data(mwi_message);

Modified: trunk/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_iax2.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_iax2.c (original)
+++ trunk/channels/chan_iax2.c Thu Aug  1 08:49:34 2013
@@ -8803,7 +8803,7 @@
 			}
 
 			ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
-			msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+			msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 
 			if (msg) {
 				struct ast_mwi_state *mwi_state = stasis_message_data(msg);

Modified: trunk/channels/chan_mgcp.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_mgcp.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_mgcp.c (original)
+++ trunk/channels/chan_mgcp.c Thu Aug  1 08:49:34 2013
@@ -508,7 +508,7 @@
 
 	ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx);
 
-	msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+	msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 
 	if (msg) {
 		struct ast_mwi_state *mwi_state = stasis_message_data(msg);

Modified: trunk/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_sip.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_sip.c (original)
+++ trunk/channels/chan_sip.c Thu Aug  1 08:49:34 2013
@@ -28386,7 +28386,7 @@
 		ast_str_reset(uniqueid);
 		ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default"));
 
-		msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+		msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 		if (!msg) {
 			continue;
 		}

Modified: trunk/channels/chan_unistim.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_unistim.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_unistim.c (original)
+++ trunk/channels/chan_unistim.c Thu Aug  1 08:49:34 2013
@@ -5502,7 +5502,7 @@
 
 	ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
 
-	msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+	msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 
 	if (msg) {
 		struct ast_mwi_state *mwi_state = stasis_message_data(msg);

Modified: trunk/channels/sig_pri.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/sig_pri.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/sig_pri.c (original)
+++ trunk/channels/sig_pri.c Thu Aug  1 08:49:34 2013
@@ -8956,7 +8956,7 @@
 		ast_str_reset(uniqueid);
 		ast_str_set(&uniqueid, 0, "%s@%s", pri->mbox[idx].number, pri->mbox[idx].context);
 
-		msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+		msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
 		if (!msg) {
 			/* No cached event for this mailbox. */
 			continue;

Modified: trunk/include/asterisk/app.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/app.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/app.h (original)
+++ trunk/include/asterisk/app.h Thu Aug  1 08:49:34 2013
@@ -1251,7 +1251,13 @@
  * \retval NULL if it has not been allocated
  * \since 12
  */
-struct stasis_caching_topic *ast_mwi_topic_cached(void);
+struct stasis_topic *ast_mwi_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_mwi_topic_cached().
+ * \retval Cache of \ref ast_mwi_state.
+ */
+struct stasis_cache *ast_mwi_state_cache(void);
 
 /*!
  * \brief Get the \ref stasis message type for MWI messages

Modified: trunk/include/asterisk/bridge.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/bridge.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/bridge.h (original)
+++ trunk/include/asterisk/bridge.h Thu Aug  1 08:49:34 2013
@@ -277,6 +277,8 @@
 	struct ast_bridge_technology *technology;
 	/*! Private information unique to the bridge technology */
 	void *tech_pvt;
+	/*! Per-bridge topics */
+	struct stasis_cp_single *topics;
 	/*! Call ID associated with the bridge */
 	struct ast_callid *callid;
 	/*! Linked list of channels participating in the bridge */

Modified: trunk/include/asterisk/channel.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/channel.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/channel.h (original)
+++ trunk/include/asterisk/channel.h Thu Aug  1 08:49:34 2013
@@ -4186,6 +4186,21 @@
 struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
 
 /*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update
+ *
+ * If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned.
+ *
+ * \param chan Channel, or \c NULL.
+ *
+ * \retval Topic for channel's events.
+ * \retval ast_channel_topic_all() if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan);
+
+/*!
  * \brief Get the bridge associated with a channel
  * \since 12.0.0
  *

Modified: trunk/include/asterisk/channel_internal.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/channel_internal.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/channel_internal.h (original)
+++ trunk/include/asterisk/channel_internal.h Thu Aug  1 08:49:34 2013
@@ -23,5 +23,5 @@
 void ast_channel_internal_finalize(struct ast_channel *chan);
 int ast_channel_internal_is_finalized(struct ast_channel *chan);
 void ast_channel_internal_cleanup(struct ast_channel *chan);
-void ast_channel_internal_setup_topics(struct ast_channel *chan);
+int ast_channel_internal_setup_topics(struct ast_channel *chan);
 

Modified: trunk/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/devicestate.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/devicestate.h (original)
+++ trunk/include/asterisk/devicestate.h Thu Aug  1 08:49:34 2013
@@ -307,7 +307,14 @@
  * \retval NULL if it has not been allocated
  * \since 12
  */
-struct stasis_caching_topic *ast_device_state_topic_cached(void);
+struct stasis_topic *ast_device_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_device_state_topic_cached()
+ * \retval Cache of \ref ast_device_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_device_state_cache(void);
 
 /*!
  * \brief Get the Stasis message type for device state messages

Modified: trunk/include/asterisk/presencestate.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/presencestate.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/presencestate.h (original)
+++ trunk/include/asterisk/presencestate.h Thu Aug  1 08:49:34 2013
@@ -168,7 +168,14 @@
  * \retval Caching Stasis topic for presence state messages
  * \since 12
  */
-struct stasis_caching_topic *ast_presence_state_topic_cached(void);
+struct stasis_topic *ast_presence_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_presence_state_topic_cached()
+ * \retval Cache of \ref ast_presence_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_presence_state_cache(void);
 
 /*!
  * \brief Stasis message payload representing a presence state update

Modified: trunk/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis.h (original)
+++ trunk/include/asterisk/stasis.h Thu Aug  1 08:49:34 2013
@@ -94,13 +94,24 @@
  * in the system, and it's desirable to query that state from the cache without
  * locking the original object. It's also desirable for subscribers of the
  * caching topic to receive messages that have both the old cache value and the
- * new value being put into the cache. For this, we have
- * stasis_caching_topic_create(), providing it with the topic which publishes
- * the messages that you wish to cache, and a function that can identify
- * cacheable messages.
- *
- * The returned \ref stasis_caching_topic provides a topic that forwards
- * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * new value being put into the cache. For this, we have stasis_cache_create()
+ * and stasis_caching_topic_create(), providing them with the topic which
+ * publishes the messages that you wish to cache, and a function that can
+ * identify cacheable messages.
+ *
+ * The \ref stasis_cache is designed so that it may be shared amongst several
+ * \ref stasis_caching_topic objects. This allows you to have individual caching
+ * topics per-object (i.e. so you can subscribe to updates for a single object),
+ * and still have a single cache to query for the state of all objects. While a
+ * cache may be shared amongst different message types, such a usage is probably
+ * not a good idea.
+ *
+ * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
+ * It's a thread safe container, so freely use the stasis_cache_get() and
+ * stasis_cache_dump() to query the cache.
+ *
+ * The \ref stasis_caching_topic provides a topic that forwards non-cacheable
+ * messages unchanged. A cacheable message is wrapped in a \ref
  * stasis_cache_update message which provides the old snapshot (or \c NULL if
  * this is a new cache entry), and the new snapshot (or \c NULL if the entry was
  * removed from the cache). A stasis_cache_clear_create() message must be sent
@@ -110,6 +121,9 @@
  * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
  * stasis_caching_topic will not be freed until after it has been unsubscribed,
  * and all other ao2_ref()'s have been cleaned up.
+ *
+ * The \ref stasis_cache object is a normal AO2 managed object, which can be
+ * release with ao2_cleanup().
  *
  * \par stasis_subscriber
  *
@@ -345,6 +359,15 @@
 			    struct stasis_topic *publisher_topic,
 			    struct stasis_message *message);
 
+/*!
+ * \brief Wait for all pending messages on a given topic to be processed.
+ * \param topic Topic to await pending messages on.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_topic_wait(struct stasis_topic *topic);
+
 /*! @} */
 
 /*! @{ */
@@ -513,6 +536,8 @@
 struct stasis_message_type *stasis_subscription_change_type(void);
 
 /*! @} */
+
+/*! @{ */
 
 /*!
  * \brief Pool for topic aggregation
@@ -575,22 +600,17 @@
 /*! @{ */
 
 /*!
+ * \brief A message cache, for use with \ref stasis_caching_topic.
+ * \since 12
+ */
+struct stasis_cache;
+
+/*!
  * \brief A topic wrapper, which caches certain messages.
  * \since 12
  */
 struct stasis_caching_topic;
 
-/*!
- * \brief A message which instructs the caching topic to remove an entry from its cache.
- *
- * \param message Message representative of the cache entry that should be cleared.
- *     This will become the data held in the stasis_cache_clear message.
- *
- * \return Message which, when sent to the \a topic, will clear the item from the cache.
- * \return \c NULL on error.
- * \since 12
- */
-struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
 
 /*!
  * \brief Callback extract a unique identity from a snapshot message.
@@ -604,6 +624,21 @@
  * \since 12
  */
 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a cache.
+ *
+ * This is the backend store for a \ref stasis_caching_topic. The cache is
+ * thread safe, allowing concurrent reads and writes.
+ *
+ * The returned object is AO2 managed, so ao2_cleanup() when you're done.
+ *
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New cache indexed by \a id_fn.
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
 
 /*!
  * \brief Create a topic which monitors and caches messages from another topic.
@@ -613,13 +648,17 @@
  * is updated, and a stasis_cache_update() message is forwarded, which has both
  * the original snapshot message and the new message.
  *
+ * The returned object is AO2 managed, so ao2_cleanup() when done with it.
+ *
  * \param original_topic Topic publishing snapshot messages.
- * \param id_fn Callback to extract the id from a snapshot message.
+ * \param cache Backend cache in which to keep snapshots.
  * \return New topic which changes snapshot messages to stasis_cache_update()
  *         messages, and forwards all other messages from the original topic.
- * \since 12
- */
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(
+	struct stasis_topic *original_topic, struct stasis_cache *cache);
 
 /*!
  * \brief Unsubscribes a caching topic from its upstream topic.
@@ -651,53 +690,55 @@
 /*!
  * \brief Returns the topic of cached events from a caching topics.
  * \param caching_topic The caching topic.
- * \return The topic that publishes cache update events, along with passthrough events
- *         from the underlying topic.
+ * \return The topic that publishes cache update events, along with passthrough
+ *         events from the underlying topic.
  * \return \c NULL if \a caching_topic is \c NULL.
  * \since 12
  */
-struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+struct stasis_topic *stasis_caching_get_topic(
+	struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from
+ * its cache.
+ *
+ * \param message Message representative of the cache entry that should be
+ *                cleared. This will become the data held in the
+ *                stasis_cache_clear message.
+ *
+ * \return Message which, when sent to a \ref stasis_caching_topic, will clear
+ *         the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
 
 /*!
  * \brief Retrieve an item from the cache.
  *
  * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
  *
- * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param cache The cache to query.
  * \param type Type of message to retrieve.
  * \param id Identity of the snapshot to retrieve.
  * \return Message from the cache.
  * \return \c NULL if message is not found.
  * \since 12
  */
-#define stasis_cache_get(caching_topic, type, id) stasis_cache_get_extended(caching_topic, type, id, 0)
-
-/*!
- * \brief Retrieve an item from the cache.
- * \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to retrieve.
- * \param id Identity of the snapshot to retrieve.
- * \param guaranteed If set to 1 it is guaranteed that any pending messages have been processed.
- * \return Message from the cache. The cache still owns the message, so
- *         ao2_ref() if you want to keep it.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic,
-					struct stasis_message_type *type,
-					const char *id,
-					unsigned int guaranteed);
+struct stasis_message *stasis_cache_get(
+	struct stasis_cache *cache, struct stasis_message_type *type,
+	const char *id);
 
 /*!
  * \brief Dump cached items to a subscription
- * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param cache The cache to query.
  * \param type Type of message to dump (any type if \c NULL).
  * \return ao2_container containing all matches (must be unreffed by caller)
  * \return \c NULL on allocation error
  * \since 12
  */
-struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
-					struct stasis_message_type *type);
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache,
+	struct stasis_message_type *type);
 
 /*! @} */
 
@@ -831,12 +872,17 @@
 
 /*!
  * \internal
- * \brief called by stasis_init for config initialization.
+ * \brief called by stasis_init() for config initialization.
  * \return 0 on success.
  * \return Non-zero on error.
  * \since 12
  */
 int stasis_config_init(void);
+
+/*!
+ * \internal
+ */
+int stasis_wait_init(void);
 
 struct ast_threadpool_options;
 

Modified: trunk/include/asterisk/stasis_bridges.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_bridges.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_bridges.h (original)
+++ trunk/include/asterisk/stasis_bridges.h Thu Aug  1 08:49:34 2013
@@ -91,6 +91,22 @@
 
 /*!
  * \since 12
+ * \brief A topic which publishes the events for a particular bridge.
+ *
+ * \ref ast_bridge_snapshot messages are replaced with stasis_cache_update
+ * messages.
+ *
+ * If the given \a bridge is \c NULL, ast_bridge_topic_all_cached() is returned.
+ *
+ * \param bridge Bridge for which to get a topic or \c NULL.
+ *
+ * \retval Topic for bridge's events.
+ * \retval ast_bridge_topic_all() if \a bridge is \c NULL.
+ */
+struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge);
+
+/*!
+ * \since 12
  * \brief A topic which publishes the events for all bridges.
  * \retval Topic for all bridge events.
  */
@@ -103,7 +119,14 @@
  *
  * \retval Caching topic for all bridge events.
  */
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void);
+struct stasis_topic *ast_bridge_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Backend cache for ast_bridge_topic_all_cached().
+ * \retval Cache of \ref ast_bridge_snapshot.
+ */
+struct stasis_cache *ast_bridge_cache(void);
 
 /*!
  * \since 12
@@ -408,6 +431,15 @@
 	const char *bridge_id);
 
 /*!
+ * \internal
+ * \brief Initialize the topics for a single bridge.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ */
+int bridge_topics_init(struct ast_bridge *bridge);
+
+/*!
+ * \internal
  * \brief Initialize the stasis bridging topic and message types
  * \retval 0 on success
  * \retval -1 on failure

Modified: trunk/include/asterisk/stasis_channels.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_channels.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_channels.h (original)
+++ trunk/include/asterisk/stasis_channels.h Thu Aug  1 08:49:34 2013
@@ -106,6 +106,8 @@
  */
 struct ast_multi_channel_blob;
 
+struct stasis_cp_all *ast_channel_cache_all(void);
+
 /*!
  * \since 12
  * \brief A topic which publishes the events for all channels.
@@ -120,16 +122,23 @@
  *
  * \retval Topic for all channel events.
  */
-struct stasis_caching_topic *ast_channel_topic_all_cached(void);
-
-/*!
- * \since 12
- * \brief A caching topic which caches \ref ast_channel_snapshot messages from
- * ast_channel_events_all(void) and indexes them by name.
- *
- * \retval Topic for all channel events.
- */
-struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void);
+struct stasis_topic *ast_channel_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Primary channel cache, indexed by Uniqueid.
+ *
+ * \retval Cache of \ref ast_channel_snapshot.
+ */
+struct stasis_cache *ast_channel_cache(void);
+
+/*!
+ * \since 12
+ * \brief Secondary channel cache, indexed by name.
+ *
+ * \retval Cache of \ref ast_channel_snapshot.
+ */
+struct stasis_cache *ast_channel_cache_by_name(void);
 
 /*!
  * \since 12
@@ -551,7 +560,9 @@
 
 /*!
  * \brief Initialize the stasis channel topic and message types
- */
-void ast_stasis_channels_init(void);
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+int ast_stasis_channels_init(void);
 
 #endif /* STASIS_CHANNELS_H_ */

Modified: trunk/include/asterisk/stasis_endpoints.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_endpoints.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_endpoints.h (original)
+++ trunk/include/asterisk/stasis_endpoints.h Thu Aug  1 08:49:34 2013
@@ -30,6 +30,7 @@
 #include "asterisk/endpoints.h"
 #include "asterisk/json.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_cache_pattern.h"
 #include "asterisk/stringfields.h"
 
 /*! \addtogroup StasisTopicsAndMessages
@@ -144,6 +145,31 @@
 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint);
 
 /*!
+ * \brief Returns the topic for a specific endpoint.
+ *
+ * \ref ast_endpoint_snapshot messages are replaced with
+ * \ref stasis_cache_update
+ *
+ * \param endpoint The endpoint.
+ * \return The topic for the given endpoint.
+ * \return ast_endpoint_topic_all() if endpoint is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint);
+
+/*!
+ * \internal
+ * \brief Cache and global topics for endpoints.
+ *
+ * This is public simply to be used by endpoints.c. Please use the accessor
+ * functions (ast_endpoint_topic_all(), ast_endpoint_topic_all_cached(),
+ * ast_endpoint_cache(), etc.) instead of calling this directly.
+ *
+ * \since 12
+ */
+struct stasis_cp_all *ast_endpoint_cache_all(void);
+
+/*!
  * \brief Topic for all endpoint releated messages.
  * \since 12
  */
@@ -153,7 +179,14 @@
  * \brief Cached topic for all endpoint related messages.
  * \since 12
  */
-struct stasis_caching_topic *ast_endpoint_topic_all_cached(void);
+struct stasis_topic *ast_endpoint_topic_all_cached(void);
+
+/*!
+ * \brief Backend cache for ast_endpoint_topic_all_cached().
+ * \return Cache of \ref ast_endpoint_snapshot.
+ * \since 12
+ */
+struct stasis_cache *ast_endpoint_cache(void);
 
 /*!
  * \brief Retrieve the most recent snapshot for the endpoint with the given

Modified: trunk/main/app.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/app.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/app.c (original)
+++ trunk/main/app.c Thu Aug  1 08:49:34 2013
@@ -88,6 +88,7 @@
  * @{ \brief Define \ref stasis topic objects for MWI
  */
 static struct stasis_topic *mwi_topic_all;
+static struct stasis_cache *mwi_state_cache;
 static struct stasis_caching_topic *mwi_topic_cached;
 static struct stasis_topic_pool *mwi_topic_pool;
 /* @} */
@@ -2696,9 +2697,14 @@
 	return mwi_topic_all;
 }
 
-struct stasis_caching_topic *ast_mwi_topic_cached(void)
-{
-	return mwi_topic_cached;
+struct stasis_cache *ast_mwi_state_cache(void)
+{
+	return mwi_state_cache;
+}
+
+struct stasis_topic *ast_mwi_topic_cached(void)
+{
+	return stasis_caching_get_topic(mwi_topic_cached);
 }
 
 struct stasis_topic *ast_mwi_topic(const char *uniqueid)
@@ -2754,7 +2760,7 @@
 
 	if (!ast_strlen_zero(channel_id)) {
 		RAII_VAR(struct stasis_message *, chan_message,
-			stasis_cache_get(ast_channel_topic_all_cached(),
+			stasis_cache_get(ast_channel_cache(),
 					ast_channel_snapshot_type(),
 					channel_id),
 			ao2_cleanup);
@@ -2855,7 +2861,11 @@
 	if (!mwi_topic_all) {
 		return -1;
 	}
-	mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+	mwi_state_cache = stasis_cache_create(mwi_state_get_id);
+	if (!mwi_state_cache) {
+		return -1;
+	}
+	mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
 	if (!mwi_topic_cached) {
 		return -1;
 	}

Modified: trunk/main/bridge.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/bridge.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/bridge.c (original)
+++ trunk/main/bridge.c Thu Aug  1 08:49:34 2013
@@ -46,6 +46,7 @@
 #include "asterisk/bridge_after.h"
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_cache_pattern.h"
 #include "asterisk/app.h"
 #include "asterisk/file.h"
 #include "asterisk/module.h"
@@ -634,6 +635,8 @@
 	}
 
 	cleanup_video_mode(bridge);
+
+	stasis_cp_single_unsubscribe(bridge->topics);
 }
 
 struct ast_bridge *bridge_register(struct ast_bridge *bridge)
@@ -684,6 +687,13 @@
 	ast_uuid_generate_str(self->uniqueid, sizeof(self->uniqueid));
 	ast_set_flag(&self->feature_flags, flags);
 	self->allowed_capabilities = capabilities;
+
+	if (bridge_topics_init(self) != 0) {
+		ast_log(LOG_WARNING, "Bridge %s: Could not initialize topics\n",
+			self->uniqueid);
+		ao2_ref(self, -1);
+		return NULL;
+	}
 
 	/* Use our helper function to find the "best" bridge technology. */
 	self->technology = find_best_technology(capabilities, self);
@@ -4397,7 +4407,7 @@
 	struct ao2_iterator iter;
 	struct stasis_message *msg;
 
-	if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+	if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
 		return NULL;
 	}
 
@@ -4435,7 +4445,7 @@
 		return NULL;
 	}
 
-	if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+	if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
 		ast_cli(a->fd, "Failed to retrieve cached bridges\n");
 		return CLI_SUCCESS;
 	}
@@ -4467,7 +4477,7 @@
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	struct ast_channel_snapshot *snapshot;
 
-	if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) {
+	if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) {
 		return 0;
 	}
 	snapshot = stasis_message_data(msg);
@@ -4500,7 +4510,7 @@
 		return CLI_SHOWUSAGE;
 	}
 
-	msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]);
+	msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
 	if (!msg) {
 		ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
 		return CLI_SUCCESS;

Modified: trunk/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cdr.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cdr.c (original)
+++ trunk/main/cdr.c Thu Aug  1 08:49:34 2013
@@ -4005,11 +4005,11 @@
 		return -1;
 	}
 
-	channel_subscription = stasis_forward_all(stasis_caching_get_topic(ast_channel_topic_all_cached()), cdr_topic);
+	channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
 	if (!channel_subscription) {
 		return -1;
 	}
-	bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic);
+	bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
 	if (!bridge_subscription) {
 		return -1;
 	}

Modified: trunk/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cel.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cel.c (original)
+++ trunk/main/cel.c Thu Aug  1 08:49:34 2013
@@ -1551,14 +1551,14 @@
 	}
 
 	cel_channel_forwarder = stasis_forward_all(
-		stasis_caching_get_topic(ast_channel_topic_all_cached()),
+		ast_channel_topic_all_cached(),
 		cel_aggregation_topic);
 	if (!cel_channel_forwarder) {
 		return -1;
 	}
 
 	cel_bridge_forwarder = stasis_forward_all(
-		stasis_caching_get_topic(ast_bridge_topic_all_cached()),
+		ast_bridge_topic_all_cached(),
 		cel_aggregation_topic);
 	if (!cel_bridge_forwarder) {
 		return -1;

Modified: trunk/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/channel_internal_api.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/channel_internal_api.c (original)
+++ trunk/main/channel_internal_api.c Thu Aug  1 08:49:34 2013
@@ -44,6 +44,7 @@
 #include "asterisk/data.h"
 #include "asterisk/endpoints.h"
 #include "asterisk/indications.h"
+#include "asterisk/stasis_cache_pattern.h"
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/stringfields.h"
@@ -208,7 +209,7 @@
 	char dtmf_digit_to_emulate;			/*!< Digit being emulated */
 	char sending_dtmf_digit;			/*!< Digit this channel is currently sending out. (zero if not sending) */
 	struct timeval sending_dtmf_tv;		/*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
-	struct stasis_topic *topic;			/*!< Topic for all channel's events */
+	struct stasis_cp_single *topics;		/*!< Topic for all channel's events */
 	struct stasis_subscription *forwarder;		/*!< Subscription for event forwarding to all topic */
 	struct stasis_subscription *endpoint_forward;	/*!< Subscription for event forwarding to endpoint's topic */
 };
@@ -1434,8 +1435,8 @@
 	chan->forwarder = stasis_unsubscribe(chan->forwarder);
 	chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
 
-	ao2_cleanup(chan->topic);
-	chan->topic = NULL;
+	stasis_cp_single_unsubscribe(chan->topics);
+	chan->topics = NULL;
 }
 
 void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1450,16 +1451,31 @@
 
 struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
 {
-	return chan ? chan->topic : ast_channel_topic_all();
-}
-
-int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint)
+	if (!chan) {
+		return ast_channel_topic_all();
+	}
+
+	return stasis_cp_single_topic(chan->topics);
+}
+
+struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan)
+{
+	if (!chan) {
+		return ast_channel_topic_all_cached();
+	}
+
+	return stasis_cp_single_topic_cached(chan->topics);
+}
+
+int ast_channel_forward_endpoint(struct ast_channel *chan,
+	struct ast_endpoint *endpoint)
 {
 	ast_assert(chan != NULL);
 	ast_assert(endpoint != NULL);
 
 	chan->endpoint_forward =
-		stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint));
+		stasis_forward_all(ast_channel_topic(chan),
+			ast_endpoint_topic(endpoint));
 
 	if (chan->endpoint_forward == NULL) {
 		return -1;
@@ -1468,19 +1484,21 @@
 	return 0;
 }
 
-void ast_channel_internal_setup_topics(struct ast_channel *chan)
+int ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
 	const char *topic_name = chan->uniqueid;
-	ast_assert(chan->topic == NULL);
-	ast_assert(chan->forwarder == NULL);
+	ast_assert(chan->topics == NULL);
 
 	if (ast_strlen_zero(topic_name)) {
 		topic_name = "<dummy-channel>";
 	}
 
-	chan->topic = stasis_topic_create(topic_name);
-	chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
-
-	ast_assert(chan->topic != NULL);
-	ast_assert(chan->forwarder != NULL);
-}
+	chan->topics = stasis_cp_single_create(
+		ast_channel_cache_all(), topic_name);
+
+	if (!chan->topics) {
+		return -1;
+	}
+
+	return 0;
+}

Modified: trunk/main/cli.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cli.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cli.c (original)
+++ trunk/main/cli.c Thu Aug  1 08:49:34 2013
@@ -915,7 +915,7 @@
 		return CLI_SHOWUSAGE;
 
 
-	if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+	if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
 		ast_cli(a->fd, "Failed to retrieve cached channels\n");
 		return CLI_SUCCESS;
 	}
@@ -1438,7 +1438,7 @@
 
 	now = ast_tvnow();
 
-	if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+	if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
 		ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]);

[... 1866 lines stripped ...]



More information about the asterisk-commits mailing list