[asterisk-commits] dlee: trunk r395954 - in /trunk: apps/ apps/confbridge/ channels/ include/ast...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Aug 1 08:49:39 CDT 2013
Author: dlee
Date: Thu Aug 1 08:49:34 2013
New Revision: 395954
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=395954
Log:
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.
To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.
In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:
single_topic ----------------> all_topic
^
|
single_topic_cached ----+----> all_topic_cached
|
+----> cache
This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.
Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.
(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/
Added:
trunk/include/asterisk/stasis_cache_pattern.h
- copied unchanged from r395953, team/dlee/stasis-cache-split/include/asterisk/stasis_cache_pattern.h
trunk/main/stasis_cache_pattern.c
- copied unchanged from r395953, team/dlee/stasis-cache-split/main/stasis_cache_pattern.c
trunk/main/stasis_wait.c
- copied unchanged from r395953, team/dlee/stasis-cache-split/main/stasis_wait.c
Modified:
trunk/apps/app_meetme.c
trunk/apps/app_voicemail.c
trunk/apps/confbridge/confbridge_manager.c
trunk/channels/chan_dahdi.c
trunk/channels/chan_iax2.c
trunk/channels/chan_mgcp.c
trunk/channels/chan_sip.c
trunk/channels/chan_unistim.c
trunk/channels/sig_pri.c
trunk/include/asterisk/app.h
trunk/include/asterisk/bridge.h
trunk/include/asterisk/channel.h
trunk/include/asterisk/channel_internal.h
trunk/include/asterisk/devicestate.h
trunk/include/asterisk/presencestate.h
trunk/include/asterisk/stasis.h
trunk/include/asterisk/stasis_bridges.h
trunk/include/asterisk/stasis_channels.h
trunk/include/asterisk/stasis_endpoints.h
trunk/main/app.c
trunk/main/bridge.c
trunk/main/cdr.c
trunk/main/cel.c
trunk/main/channel_internal_api.c
trunk/main/cli.c
trunk/main/devicestate.c
trunk/main/endpoints.c
trunk/main/manager.c
trunk/main/manager_bridges.c
trunk/main/manager_channels.c
trunk/main/manager_endpoints.c
trunk/main/pbx.c
trunk/main/presencestate.c
trunk/main/stasis.c
trunk/main/stasis_bridges.c
trunk/main/stasis_cache.c
trunk/main/stasis_channels.c
trunk/main/stasis_endpoints.c
trunk/res/ari/resource_bridges.c
trunk/res/ari/resource_channels.c
trunk/res/ari/resource_endpoints.c
trunk/res/res_agi.c
trunk/res/res_chan_stats.c
trunk/res/res_jabber.c
trunk/res/res_stasis.c
trunk/res/res_xmpp.c
trunk/res/stasis/control.c
trunk/tests/test_cel.c
trunk/tests/test_devicestate.c
trunk/tests/test_stasis.c
trunk/tests/test_stasis_endpoints.c
Modified: trunk/apps/app_meetme.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_meetme.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/app_meetme.c (original)
+++ trunk/apps/app_meetme.c Thu Aug 1 08:49:34 2013
@@ -1167,7 +1167,7 @@
STASIS_MESSAGE_TYPE_INIT(meetme_talk_request_type);
meetme_event_message_router = stasis_message_router_create(
- stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ ast_channel_cache());
if (!meetme_event_message_router) {
meetme_stasis_cleanup();
Modified: trunk/apps/app_voicemail.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_voicemail.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/app_voicemail.c (original)
+++ trunk/apps/app_voicemail.c Thu Aug 1 08:49:34 2013
@@ -12640,7 +12640,7 @@
mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL);
if (mwi_sub_sub) {
- struct ao2_container *cached = stasis_cache_dump(ast_mwi_topic_cached(), stasis_subscription_change_type());
+ struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type());
if (cached) {
ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
}
Modified: trunk/apps/confbridge/confbridge_manager.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/confbridge/confbridge_manager.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/apps/confbridge/confbridge_manager.c (original)
+++ trunk/apps/confbridge/confbridge_manager.c Thu Aug 1 08:49:34 2013
@@ -344,7 +344,7 @@
STASIS_MESSAGE_TYPE_INIT(confbridge_talking_type);
bridge_state_router = stasis_message_router_create(
- stasis_caching_get_topic(ast_bridge_topic_all_cached()));
+ ast_bridge_topic_all_cached());
if (!bridge_state_router) {
return -1;
@@ -415,7 +415,7 @@
}
channel_state_router = stasis_message_router_create(
- stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ ast_channel_topic_all_cached());
if (!channel_state_router) {
manager_confbridge_shutdown();
Modified: trunk/channels/chan_dahdi.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_dahdi.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_dahdi.c (original)
+++ trunk/channels/chan_dahdi.c Thu Aug 1 08:49:34 2013
@@ -4822,7 +4822,7 @@
}
ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
- mwi_message = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ mwi_message = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (mwi_message) {
struct ast_mwi_state *mwi_state = stasis_message_data(mwi_message);
Modified: trunk/channels/chan_iax2.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_iax2.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_iax2.c (original)
+++ trunk/channels/chan_iax2.c Thu Aug 1 08:49:34 2013
@@ -8803,7 +8803,7 @@
}
ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
- msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (msg) {
struct ast_mwi_state *mwi_state = stasis_message_data(msg);
Modified: trunk/channels/chan_mgcp.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_mgcp.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_mgcp.c (original)
+++ trunk/channels/chan_mgcp.c Thu Aug 1 08:49:34 2013
@@ -508,7 +508,7 @@
ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx);
- msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (msg) {
struct ast_mwi_state *mwi_state = stasis_message_data(msg);
Modified: trunk/channels/chan_sip.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_sip.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_sip.c (original)
+++ trunk/channels/chan_sip.c Thu Aug 1 08:49:34 2013
@@ -28386,7 +28386,7 @@
ast_str_reset(uniqueid);
ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default"));
- msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (!msg) {
continue;
}
Modified: trunk/channels/chan_unistim.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/chan_unistim.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/chan_unistim.c (original)
+++ trunk/channels/chan_unistim.c Thu Aug 1 08:49:34 2013
@@ -5502,7 +5502,7 @@
ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
- msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (msg) {
struct ast_mwi_state *mwi_state = stasis_message_data(msg);
Modified: trunk/channels/sig_pri.c
URL: http://svnview.digium.com/svn/asterisk/trunk/channels/sig_pri.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/channels/sig_pri.c (original)
+++ trunk/channels/sig_pri.c Thu Aug 1 08:49:34 2013
@@ -8956,7 +8956,7 @@
ast_str_reset(uniqueid);
ast_str_set(&uniqueid, 0, "%s@%s", pri->mbox[idx].number, pri->mbox[idx].context);
- msg = stasis_cache_get(ast_mwi_topic_cached(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
+ msg = stasis_cache_get(ast_mwi_state_cache(), ast_mwi_state_type(), ast_str_buffer(uniqueid));
if (!msg) {
/* No cached event for this mailbox. */
continue;
Modified: trunk/include/asterisk/app.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/app.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/app.h (original)
+++ trunk/include/asterisk/app.h Thu Aug 1 08:49:34 2013
@@ -1251,7 +1251,13 @@
* \retval NULL if it has not been allocated
* \since 12
*/
-struct stasis_caching_topic *ast_mwi_topic_cached(void);
+struct stasis_topic *ast_mwi_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_mwi_topic_cached().
+ * \retval Cache of \ref ast_mwi_state.
+ */
+struct stasis_cache *ast_mwi_state_cache(void);
/*!
* \brief Get the \ref stasis message type for MWI messages
Modified: trunk/include/asterisk/bridge.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/bridge.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/bridge.h (original)
+++ trunk/include/asterisk/bridge.h Thu Aug 1 08:49:34 2013
@@ -277,6 +277,8 @@
struct ast_bridge_technology *technology;
/*! Private information unique to the bridge technology */
void *tech_pvt;
+ /*! Per-bridge topics */
+ struct stasis_cp_single *topics;
/*! Call ID associated with the bridge */
struct ast_callid *callid;
/*! Linked list of channels participating in the bridge */
Modified: trunk/include/asterisk/channel.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/channel.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/channel.h (original)
+++ trunk/include/asterisk/channel.h Thu Aug 1 08:49:34 2013
@@ -4186,6 +4186,21 @@
struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
/*!
+ * \since 12
+ * \brief A topic which publishes the events for a particular channel.
+ *
+ * \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update
+ *
+ * If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned.
+ *
+ * \param chan Channel, or \c NULL.
+ *
+ * \retval Topic for channel's events.
+ * \retval ast_channel_topic_all() if \a chan is \c NULL.
+ */
+struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan);
+
+/*!
* \brief Get the bridge associated with a channel
* \since 12.0.0
*
Modified: trunk/include/asterisk/channel_internal.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/channel_internal.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/channel_internal.h (original)
+++ trunk/include/asterisk/channel_internal.h Thu Aug 1 08:49:34 2013
@@ -23,5 +23,5 @@
void ast_channel_internal_finalize(struct ast_channel *chan);
int ast_channel_internal_is_finalized(struct ast_channel *chan);
void ast_channel_internal_cleanup(struct ast_channel *chan);
-void ast_channel_internal_setup_topics(struct ast_channel *chan);
+int ast_channel_internal_setup_topics(struct ast_channel *chan);
Modified: trunk/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/devicestate.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/devicestate.h (original)
+++ trunk/include/asterisk/devicestate.h Thu Aug 1 08:49:34 2013
@@ -307,7 +307,14 @@
* \retval NULL if it has not been allocated
* \since 12
*/
-struct stasis_caching_topic *ast_device_state_topic_cached(void);
+struct stasis_topic *ast_device_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_device_state_topic_cached()
+ * \retval Cache of \ref ast_device_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_device_state_cache(void);
/*!
* \brief Get the Stasis message type for device state messages
Modified: trunk/include/asterisk/presencestate.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/presencestate.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/presencestate.h (original)
+++ trunk/include/asterisk/presencestate.h Thu Aug 1 08:49:34 2013
@@ -168,7 +168,14 @@
* \retval Caching Stasis topic for presence state messages
* \since 12
*/
-struct stasis_caching_topic *ast_presence_state_topic_cached(void);
+struct stasis_topic *ast_presence_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_presence_state_topic_cached()
+ * \retval Cache of \ref ast_presence_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_presence_state_cache(void);
/*!
* \brief Stasis message payload representing a presence state update
Modified: trunk/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis.h (original)
+++ trunk/include/asterisk/stasis.h Thu Aug 1 08:49:34 2013
@@ -94,13 +94,24 @@
* in the system, and it's desirable to query that state from the cache without
* locking the original object. It's also desirable for subscribers of the
* caching topic to receive messages that have both the old cache value and the
- * new value being put into the cache. For this, we have
- * stasis_caching_topic_create(), providing it with the topic which publishes
- * the messages that you wish to cache, and a function that can identify
- * cacheable messages.
- *
- * The returned \ref stasis_caching_topic provides a topic that forwards
- * non-cacheable messages unchanged. A cacheable message is wrapped in a \ref
+ * new value being put into the cache. For this, we have stasis_cache_create()
+ * and stasis_caching_topic_create(), providing them with the topic which
+ * publishes the messages that you wish to cache, and a function that can
+ * identify cacheable messages.
+ *
+ * The \ref stasis_cache is designed so that it may be shared amongst several
+ * \ref stasis_caching_topic objects. This allows you to have individual caching
+ * topics per-object (i.e. so you can subscribe to updates for a single object),
+ * and still have a single cache to query for the state of all objects. While a
+ * cache may be shared amongst different message types, such a usage is probably
+ * not a good idea.
+ *
+ * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
+ * It's a thread safe container, so freely use the stasis_cache_get() and
+ * stasis_cache_dump() to query the cache.
+ *
+ * The \ref stasis_caching_topic provides a topic that forwards non-cacheable
+ * messages unchanged. A cacheable message is wrapped in a \ref
* stasis_cache_update message which provides the old snapshot (or \c NULL if
* this is a new cache entry), and the new snapshot (or \c NULL if the entry was
* removed from the cache). A stasis_cache_clear_create() message must be sent
@@ -110,6 +121,9 @@
* call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
* stasis_caching_topic will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
+ *
+ * The \ref stasis_cache object is a normal AO2 managed object, which can be
+ * release with ao2_cleanup().
*
* \par stasis_subscriber
*
@@ -345,6 +359,15 @@
struct stasis_topic *publisher_topic,
struct stasis_message *message);
+/*!
+ * \brief Wait for all pending messages on a given topic to be processed.
+ * \param topic Topic to await pending messages on.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_topic_wait(struct stasis_topic *topic);
+
/*! @} */
/*! @{ */
@@ -513,6 +536,8 @@
struct stasis_message_type *stasis_subscription_change_type(void);
/*! @} */
+
+/*! @{ */
/*!
* \brief Pool for topic aggregation
@@ -575,22 +600,17 @@
/*! @{ */
/*!
+ * \brief A message cache, for use with \ref stasis_caching_topic.
+ * \since 12
+ */
+struct stasis_cache;
+
+/*!
* \brief A topic wrapper, which caches certain messages.
* \since 12
*/
struct stasis_caching_topic;
-/*!
- * \brief A message which instructs the caching topic to remove an entry from its cache.
- *
- * \param message Message representative of the cache entry that should be cleared.
- * This will become the data held in the stasis_cache_clear message.
- *
- * \return Message which, when sent to the \a topic, will clear the item from the cache.
- * \return \c NULL on error.
- * \since 12
- */
-struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
/*!
* \brief Callback extract a unique identity from a snapshot message.
@@ -604,6 +624,21 @@
* \since 12
*/
typedef const char *(*snapshot_get_id)(struct stasis_message *message);
+
+/*!
+ * \brief Create a cache.
+ *
+ * This is the backend store for a \ref stasis_caching_topic. The cache is
+ * thread safe, allowing concurrent reads and writes.
+ *
+ * The returned object is AO2 managed, so ao2_cleanup() when you're done.
+ *
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \return New cache indexed by \a id_fn.
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
/*!
* \brief Create a topic which monitors and caches messages from another topic.
@@ -613,13 +648,17 @@
* is updated, and a stasis_cache_update() message is forwarded, which has both
* the original snapshot message and the new message.
*
+ * The returned object is AO2 managed, so ao2_cleanup() when done with it.
+ *
* \param original_topic Topic publishing snapshot messages.
- * \param id_fn Callback to extract the id from a snapshot message.
+ * \param cache Backend cache in which to keep snapshots.
* \return New topic which changes snapshot messages to stasis_cache_update()
* messages, and forwards all other messages from the original topic.
- * \since 12
- */
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
+ * \return \c NULL on error
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_topic_create(
+ struct stasis_topic *original_topic, struct stasis_cache *cache);
/*!
* \brief Unsubscribes a caching topic from its upstream topic.
@@ -651,53 +690,55 @@
/*!
* \brief Returns the topic of cached events from a caching topics.
* \param caching_topic The caching topic.
- * \return The topic that publishes cache update events, along with passthrough events
- * from the underlying topic.
+ * \return The topic that publishes cache update events, along with passthrough
+ * events from the underlying topic.
* \return \c NULL if \a caching_topic is \c NULL.
* \since 12
*/
-struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic);
+struct stasis_topic *stasis_caching_get_topic(
+ struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief A message which instructs the caching topic to remove an entry from
+ * its cache.
+ *
+ * \param message Message representative of the cache entry that should be
+ * cleared. This will become the data held in the
+ * stasis_cache_clear message.
+ *
+ * \return Message which, when sent to a \ref stasis_caching_topic, will clear
+ * the item from the cache.
+ * \return \c NULL on error.
+ * \since 12
+ */
+struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
/*!
* \brief Retrieve an item from the cache.
*
* The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
*
- * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param cache The cache to query.
* \param type Type of message to retrieve.
* \param id Identity of the snapshot to retrieve.
* \return Message from the cache.
* \return \c NULL if message is not found.
* \since 12
*/
-#define stasis_cache_get(caching_topic, type, id) stasis_cache_get_extended(caching_topic, type, id, 0)
-
-/*!
- * \brief Retrieve an item from the cache.
- * \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to retrieve.
- * \param id Identity of the snapshot to retrieve.
- * \param guaranteed If set to 1 it is guaranteed that any pending messages have been processed.
- * \return Message from the cache. The cache still owns the message, so
- * ao2_ref() if you want to keep it.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic,
- struct stasis_message_type *type,
- const char *id,
- unsigned int guaranteed);
+struct stasis_message *stasis_cache_get(
+ struct stasis_cache *cache, struct stasis_message_type *type,
+ const char *id);
/*!
* \brief Dump cached items to a subscription
- * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
* \return ao2_container containing all matches (must be unreffed by caller)
* \return \c NULL on allocation error
* \since 12
*/
-struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
- struct stasis_message_type *type);
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache,
+ struct stasis_message_type *type);
/*! @} */
@@ -831,12 +872,17 @@
/*!
* \internal
- * \brief called by stasis_init for config initialization.
+ * \brief called by stasis_init() for config initialization.
* \return 0 on success.
* \return Non-zero on error.
* \since 12
*/
int stasis_config_init(void);
+
+/*!
+ * \internal
+ */
+int stasis_wait_init(void);
struct ast_threadpool_options;
Modified: trunk/include/asterisk/stasis_bridges.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_bridges.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_bridges.h (original)
+++ trunk/include/asterisk/stasis_bridges.h Thu Aug 1 08:49:34 2013
@@ -91,6 +91,22 @@
/*!
* \since 12
+ * \brief A topic which publishes the events for a particular bridge.
+ *
+ * \ref ast_bridge_snapshot messages are replaced with stasis_cache_update
+ * messages.
+ *
+ * If the given \a bridge is \c NULL, ast_bridge_topic_all_cached() is returned.
+ *
+ * \param bridge Bridge for which to get a topic or \c NULL.
+ *
+ * \retval Topic for bridge's events.
+ * \retval ast_bridge_topic_all() if \a bridge is \c NULL.
+ */
+struct stasis_topic *ast_bridge_topic_cached(struct ast_bridge *bridge);
+
+/*!
+ * \since 12
* \brief A topic which publishes the events for all bridges.
* \retval Topic for all bridge events.
*/
@@ -103,7 +119,14 @@
*
* \retval Caching topic for all bridge events.
*/
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void);
+struct stasis_topic *ast_bridge_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Backend cache for ast_bridge_topic_all_cached().
+ * \retval Cache of \ref ast_bridge_snapshot.
+ */
+struct stasis_cache *ast_bridge_cache(void);
/*!
* \since 12
@@ -408,6 +431,15 @@
const char *bridge_id);
/*!
+ * \internal
+ * \brief Initialize the topics for a single bridge.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ */
+int bridge_topics_init(struct ast_bridge *bridge);
+
+/*!
+ * \internal
* \brief Initialize the stasis bridging topic and message types
* \retval 0 on success
* \retval -1 on failure
Modified: trunk/include/asterisk/stasis_channels.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_channels.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_channels.h (original)
+++ trunk/include/asterisk/stasis_channels.h Thu Aug 1 08:49:34 2013
@@ -106,6 +106,8 @@
*/
struct ast_multi_channel_blob;
+struct stasis_cp_all *ast_channel_cache_all(void);
+
/*!
* \since 12
* \brief A topic which publishes the events for all channels.
@@ -120,16 +122,23 @@
*
* \retval Topic for all channel events.
*/
-struct stasis_caching_topic *ast_channel_topic_all_cached(void);
-
-/*!
- * \since 12
- * \brief A caching topic which caches \ref ast_channel_snapshot messages from
- * ast_channel_events_all(void) and indexes them by name.
- *
- * \retval Topic for all channel events.
- */
-struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void);
+struct stasis_topic *ast_channel_topic_all_cached(void);
+
+/*!
+ * \since 12
+ * \brief Primary channel cache, indexed by Uniqueid.
+ *
+ * \retval Cache of \ref ast_channel_snapshot.
+ */
+struct stasis_cache *ast_channel_cache(void);
+
+/*!
+ * \since 12
+ * \brief Secondary channel cache, indexed by name.
+ *
+ * \retval Cache of \ref ast_channel_snapshot.
+ */
+struct stasis_cache *ast_channel_cache_by_name(void);
/*!
* \since 12
@@ -551,7 +560,9 @@
/*!
* \brief Initialize the stasis channel topic and message types
- */
-void ast_stasis_channels_init(void);
+ * \return 0 on success
+ * \return Non-zero on error
+ */
+int ast_stasis_channels_init(void);
#endif /* STASIS_CHANNELS_H_ */
Modified: trunk/include/asterisk/stasis_endpoints.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/stasis_endpoints.h?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/include/asterisk/stasis_endpoints.h (original)
+++ trunk/include/asterisk/stasis_endpoints.h Thu Aug 1 08:49:34 2013
@@ -30,6 +30,7 @@
#include "asterisk/endpoints.h"
#include "asterisk/json.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stringfields.h"
/*! \addtogroup StasisTopicsAndMessages
@@ -144,6 +145,31 @@
struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint);
/*!
+ * \brief Returns the topic for a specific endpoint.
+ *
+ * \ref ast_endpoint_snapshot messages are replaced with
+ * \ref stasis_cache_update
+ *
+ * \param endpoint The endpoint.
+ * \return The topic for the given endpoint.
+ * \return ast_endpoint_topic_all() if endpoint is \c NULL.
+ * \since 12
+ */
+struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint);
+
+/*!
+ * \internal
+ * \brief Cache and global topics for endpoints.
+ *
+ * This is public simply to be used by endpoints.c. Please use the accessor
+ * functions (ast_endpoint_topic_all(), ast_endpoint_topic_all_cached(),
+ * ast_endpoint_cache(), etc.) instead of calling this directly.
+ *
+ * \since 12
+ */
+struct stasis_cp_all *ast_endpoint_cache_all(void);
+
+/*!
* \brief Topic for all endpoint releated messages.
* \since 12
*/
@@ -153,7 +179,14 @@
* \brief Cached topic for all endpoint related messages.
* \since 12
*/
-struct stasis_caching_topic *ast_endpoint_topic_all_cached(void);
+struct stasis_topic *ast_endpoint_topic_all_cached(void);
+
+/*!
+ * \brief Backend cache for ast_endpoint_topic_all_cached().
+ * \return Cache of \ref ast_endpoint_snapshot.
+ * \since 12
+ */
+struct stasis_cache *ast_endpoint_cache(void);
/*!
* \brief Retrieve the most recent snapshot for the endpoint with the given
Modified: trunk/main/app.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/app.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/app.c (original)
+++ trunk/main/app.c Thu Aug 1 08:49:34 2013
@@ -88,6 +88,7 @@
* @{ \brief Define \ref stasis topic objects for MWI
*/
static struct stasis_topic *mwi_topic_all;
+static struct stasis_cache *mwi_state_cache;
static struct stasis_caching_topic *mwi_topic_cached;
static struct stasis_topic_pool *mwi_topic_pool;
/* @} */
@@ -2696,9 +2697,14 @@
return mwi_topic_all;
}
-struct stasis_caching_topic *ast_mwi_topic_cached(void)
-{
- return mwi_topic_cached;
+struct stasis_cache *ast_mwi_state_cache(void)
+{
+ return mwi_state_cache;
+}
+
+struct stasis_topic *ast_mwi_topic_cached(void)
+{
+ return stasis_caching_get_topic(mwi_topic_cached);
}
struct stasis_topic *ast_mwi_topic(const char *uniqueid)
@@ -2754,7 +2760,7 @@
if (!ast_strlen_zero(channel_id)) {
RAII_VAR(struct stasis_message *, chan_message,
- stasis_cache_get(ast_channel_topic_all_cached(),
+ stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(),
channel_id),
ao2_cleanup);
@@ -2855,7 +2861,11 @@
if (!mwi_topic_all) {
return -1;
}
- mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+ mwi_state_cache = stasis_cache_create(mwi_state_get_id);
+ if (!mwi_state_cache) {
+ return -1;
+ }
+ mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
if (!mwi_topic_cached) {
return -1;
}
Modified: trunk/main/bridge.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/bridge.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/bridge.c (original)
+++ trunk/main/bridge.c Thu Aug 1 08:49:34 2013
@@ -46,6 +46,7 @@
#include "asterisk/bridge_after.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/app.h"
#include "asterisk/file.h"
#include "asterisk/module.h"
@@ -634,6 +635,8 @@
}
cleanup_video_mode(bridge);
+
+ stasis_cp_single_unsubscribe(bridge->topics);
}
struct ast_bridge *bridge_register(struct ast_bridge *bridge)
@@ -684,6 +687,13 @@
ast_uuid_generate_str(self->uniqueid, sizeof(self->uniqueid));
ast_set_flag(&self->feature_flags, flags);
self->allowed_capabilities = capabilities;
+
+ if (bridge_topics_init(self) != 0) {
+ ast_log(LOG_WARNING, "Bridge %s: Could not initialize topics\n",
+ self->uniqueid);
+ ao2_ref(self, -1);
+ return NULL;
+ }
/* Use our helper function to find the "best" bridge technology. */
self->technology = find_best_technology(capabilities, self);
@@ -4397,7 +4407,7 @@
struct ao2_iterator iter;
struct stasis_message *msg;
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
return NULL;
}
@@ -4435,7 +4445,7 @@
return NULL;
}
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached bridges\n");
return CLI_SUCCESS;
}
@@ -4467,7 +4477,7 @@
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) {
return 0;
}
snapshot = stasis_message_data(msg);
@@ -4500,7 +4510,7 @@
return CLI_SHOWUSAGE;
}
- msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]);
+ msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
if (!msg) {
ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
return CLI_SUCCESS;
Modified: trunk/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cdr.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cdr.c (original)
+++ trunk/main/cdr.c Thu Aug 1 08:49:34 2013
@@ -4005,11 +4005,11 @@
return -1;
}
- channel_subscription = stasis_forward_all(stasis_caching_get_topic(ast_channel_topic_all_cached()), cdr_topic);
+ channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
}
- bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic);
+ bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
if (!bridge_subscription) {
return -1;
}
Modified: trunk/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cel.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cel.c (original)
+++ trunk/main/cel.c Thu Aug 1 08:49:34 2013
@@ -1551,14 +1551,14 @@
}
cel_channel_forwarder = stasis_forward_all(
- stasis_caching_get_topic(ast_channel_topic_all_cached()),
+ ast_channel_topic_all_cached(),
cel_aggregation_topic);
if (!cel_channel_forwarder) {
return -1;
}
cel_bridge_forwarder = stasis_forward_all(
- stasis_caching_get_topic(ast_bridge_topic_all_cached()),
+ ast_bridge_topic_all_cached(),
cel_aggregation_topic);
if (!cel_bridge_forwarder) {
return -1;
Modified: trunk/main/channel_internal_api.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/channel_internal_api.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/channel_internal_api.c (original)
+++ trunk/main/channel_internal_api.c Thu Aug 1 08:49:34 2013
@@ -44,6 +44,7 @@
#include "asterisk/data.h"
#include "asterisk/endpoints.h"
#include "asterisk/indications.h"
+#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stringfields.h"
@@ -208,7 +209,7 @@
char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
- struct stasis_topic *topic; /*!< Topic for all channel's events */
+ struct stasis_cp_single *topics; /*!< Topic for all channel's events */
struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
@@ -1434,8 +1435,8 @@
chan->forwarder = stasis_unsubscribe(chan->forwarder);
chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
- ao2_cleanup(chan->topic);
- chan->topic = NULL;
+ stasis_cp_single_unsubscribe(chan->topics);
+ chan->topics = NULL;
}
void ast_channel_internal_finalize(struct ast_channel *chan)
@@ -1450,16 +1451,31 @@
struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
{
- return chan ? chan->topic : ast_channel_topic_all();
-}
-
-int ast_channel_forward_endpoint(struct ast_channel *chan, struct ast_endpoint *endpoint)
+ if (!chan) {
+ return ast_channel_topic_all();
+ }
+
+ return stasis_cp_single_topic(chan->topics);
+}
+
+struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan)
+{
+ if (!chan) {
+ return ast_channel_topic_all_cached();
+ }
+
+ return stasis_cp_single_topic_cached(chan->topics);
+}
+
+int ast_channel_forward_endpoint(struct ast_channel *chan,
+ struct ast_endpoint *endpoint)
{
ast_assert(chan != NULL);
ast_assert(endpoint != NULL);
chan->endpoint_forward =
- stasis_forward_all(chan->topic, ast_endpoint_topic(endpoint));
+ stasis_forward_all(ast_channel_topic(chan),
+ ast_endpoint_topic(endpoint));
if (chan->endpoint_forward == NULL) {
return -1;
@@ -1468,19 +1484,21 @@
return 0;
}
-void ast_channel_internal_setup_topics(struct ast_channel *chan)
+int ast_channel_internal_setup_topics(struct ast_channel *chan)
{
const char *topic_name = chan->uniqueid;
- ast_assert(chan->topic == NULL);
- ast_assert(chan->forwarder == NULL);
+ ast_assert(chan->topics == NULL);
if (ast_strlen_zero(topic_name)) {
topic_name = "<dummy-channel>";
}
- chan->topic = stasis_topic_create(topic_name);
- chan->forwarder = stasis_forward_all(chan->topic, ast_channel_topic_all());
-
- ast_assert(chan->topic != NULL);
- ast_assert(chan->forwarder != NULL);
-}
+ chan->topics = stasis_cp_single_create(
+ ast_channel_cache_all(), topic_name);
+
+ if (!chan->topics) {
+ return -1;
+ }
+
+ return 0;
+}
Modified: trunk/main/cli.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/cli.c?view=diff&rev=395954&r1=395953&r2=395954
==============================================================================
--- trunk/main/cli.c (original)
+++ trunk/main/cli.c Thu Aug 1 08:49:34 2013
@@ -915,7 +915,7 @@
return CLI_SHOWUSAGE;
- if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached channels\n");
return CLI_SUCCESS;
}
@@ -1438,7 +1438,7 @@
now = ast_tvnow();
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]);
[... 1866 lines stripped ...]
More information about the asterisk-commits
mailing list