[svn-commits] dlee: branch dlee/stasis-cache-split r393967 - in /team/dlee/stasis-cache-spl...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Tue Jul 9 22:07:33 CDT 2013
Author: dlee
Date: Tue Jul 9 22:07:29 2013
New Revision: 393967
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=393967
Log:
Compiles; although linking is a bit of a problem
Added:
team/dlee/stasis-cache-split/main/stasis_wait.c (with props)
Modified:
team/dlee/stasis-cache-split/include/asterisk/devicestate.h
team/dlee/stasis-cache-split/include/asterisk/presencestate.h
team/dlee/stasis-cache-split/include/asterisk/stasis.h
team/dlee/stasis-cache-split/main/app.c
team/dlee/stasis-cache-split/main/bridging.c
team/dlee/stasis-cache-split/main/cdr.c
team/dlee/stasis-cache-split/main/cel.c
team/dlee/stasis-cache-split/main/cli.c
team/dlee/stasis-cache-split/main/devicestate.c
team/dlee/stasis-cache-split/main/manager.c
team/dlee/stasis-cache-split/main/manager_bridging.c
team/dlee/stasis-cache-split/main/manager_channels.c
team/dlee/stasis-cache-split/main/manager_endpoints.c
team/dlee/stasis-cache-split/main/pbx.c
team/dlee/stasis-cache-split/main/presencestate.c
team/dlee/stasis-cache-split/main/stasis.c
team/dlee/stasis-cache-split/main/stasis_bridging.c
team/dlee/stasis-cache-split/main/stasis_cache.c
team/dlee/stasis-cache-split/main/stasis_channels.c
team/dlee/stasis-cache-split/main/stasis_endpoints.c
team/dlee/stasis-cache-split/tests/test_cel.c
Modified: team/dlee/stasis-cache-split/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/devicestate.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/devicestate.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/devicestate.h Tue Jul 9 22:07:29 2013
@@ -310,6 +310,13 @@
struct stasis_caching_topic *ast_device_state_topic_cached(void);
/*!
+ * \brief Backend cache for ast_device_state_topic_cached()
+ * \retval Cache of \ref ast_device_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_device_state_cache(void);
+
+/*!
* \brief Get the Stasis message type for device state messages
* \retval The message type for device state messages
* \retval NULL if it has not been allocated
Modified: team/dlee/stasis-cache-split/include/asterisk/presencestate.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/presencestate.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/presencestate.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/presencestate.h Tue Jul 9 22:07:29 2013
@@ -168,7 +168,14 @@
* \retval Caching Stasis topic for presence state messages
* \since 12
*/
-struct stasis_caching_topic *ast_presence_state_topic_cached(void);
+struct stasis_topic *ast_presence_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_presence_state_topic_cached()
+ * \retval Cache of \ref ast_presence_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_presence_state_cache(void);
/*!
* \brief Stasis message payload representing a presence state update
Modified: team/dlee/stasis-cache-split/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/stasis.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/stasis.h Tue Jul 9 22:07:29 2013
@@ -345,6 +345,15 @@
struct stasis_topic *publisher_topic,
struct stasis_message *message);
+/*!
+ * \brief Wait for all pending messages on a given topic to be processed.
+ * \param topic Topic to await pending messages on.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_topic_wait(struct stasis_topic *topic);
+
/*! @} */
/*! @{ */
@@ -716,24 +725,9 @@
* \return \c NULL if message is not found.
* \since 12
*/
-#define stasis_cache_get(cache, type, id) \
- stasis_cache_get_extended(cache, type, id, 0)
-
-/*!
- * \brief Retrieve an item from the cache.
- * \param cache The cache to query.
- * \param type Type of message to retrieve.
- * \param id Identity of the snapshot to retrieve.
- * \param guaranteed If set to 1 it is guaranteed that any pending messages
- * have been processed.
- * \return Message from the cache. The cache still owns the message, so
- * ao2_ref() if you want to keep it.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get_extended(
+struct stasis_message *stasis_cache_get(
struct stasis_cache *cache, struct stasis_message_type *type,
- const char *id, unsigned int guaranteed);
+ const char *id);
/*!
* \brief Dump cached items to a subscription
@@ -878,12 +872,17 @@
/*!
* \internal
- * \brief called by stasis_init for config initialization.
+ * \brief called by stasis_init() for config initialization.
* \return 0 on success.
* \return Non-zero on error.
* \since 12
*/
int stasis_config_init(void);
+
+/*!
+ * \internal
+ */
+int stasis_wait_init(void);
struct ast_threadpool_options;
Modified: team/dlee/stasis-cache-split/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/app.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/app.c (original)
+++ team/dlee/stasis-cache-split/main/app.c Tue Jul 9 22:07:29 2013
@@ -88,6 +88,7 @@
* @{ \brief Define \ref stasis topic objects for MWI
*/
static struct stasis_topic *mwi_topic_all;
+static struct stasis_cache *mwi_state_cache;
static struct stasis_caching_topic *mwi_topic_cached;
static struct stasis_topic_pool *mwi_topic_pool;
/* @} */
@@ -2696,6 +2697,11 @@
return mwi_topic_all;
}
+struct stasis_cache *ast_mwi_state_cache(void)
+{
+ return mwi_state_cache;
+}
+
struct stasis_topic *ast_mwi_topic_cached(void)
{
return stasis_caching_get_topic(mwi_topic_cached);
@@ -2754,7 +2760,7 @@
if (!ast_strlen_zero(channel_id)) {
RAII_VAR(struct stasis_message *, chan_message,
- stasis_cache_get(ast_channel_topic_all_cached(),
+ stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(),
channel_id),
ao2_cleanup);
@@ -2855,7 +2861,11 @@
if (!mwi_topic_all) {
return -1;
}
- mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+ mwi_state_cache = stasis_cache_create(mwi_state_get_id);
+ if (!mwi_state_cache) {
+ return -1;
+ }
+ mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
if (!mwi_topic_cached) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/bridging.c (original)
+++ team/dlee/stasis-cache-split/main/bridging.c Tue Jul 9 22:07:29 2013
@@ -6893,7 +6893,7 @@
struct ao2_iterator iter;
struct stasis_message *msg;
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
return NULL;
}
@@ -6931,7 +6931,7 @@
return NULL;
}
- if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+ if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached bridges\n");
return CLI_SUCCESS;
}
@@ -6963,7 +6963,7 @@
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) {
return 0;
}
snapshot = stasis_message_data(msg);
@@ -6996,7 +6996,7 @@
return CLI_SHOWUSAGE;
}
- msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]);
+ msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
if (!msg) {
ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
return CLI_SUCCESS;
Modified: team/dlee/stasis-cache-split/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cdr.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cdr.c (original)
+++ team/dlee/stasis-cache-split/main/cdr.c Tue Jul 9 22:07:29 2013
@@ -4026,11 +4026,11 @@
return -1;
}
- channel_subscription = stasis_forward_all(ast_channel_cache(), cdr_topic);
+ channel_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
}
- bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic);
+ bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
if (!bridge_subscription) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cel.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cel.c (original)
+++ team/dlee/stasis-cache-split/main/cel.c Tue Jul 9 22:07:29 2013
@@ -1463,14 +1463,14 @@
}
cel_channel_forwarder = stasis_forward_all(
- ast_channel_cache(),
+ ast_channel_topic_all_cached(),
cel_aggregation_topic);
if (!cel_channel_forwarder) {
return -1;
}
cel_bridge_forwarder = stasis_forward_all(
- stasis_caching_get_topic(ast_bridge_topic_all_cached()),
+ ast_bridge_topic_all_cached(),
cel_aggregation_topic);
if (!cel_bridge_forwarder) {
return -1;
Modified: team/dlee/stasis-cache-split/main/cli.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cli.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cli.c (original)
+++ team/dlee/stasis-cache-split/main/cli.c Tue Jul 9 22:07:29 2013
@@ -915,7 +915,7 @@
return CLI_SHOWUSAGE;
- if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached channels\n");
return CLI_SUCCESS;
}
@@ -1438,7 +1438,7 @@
now = ast_tvnow();
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]);
return CLI_SUCCESS;
}
@@ -1571,7 +1571,7 @@
return NULL;
}
- if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached(), ast_channel_snapshot_type()))) {
+ if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) {
return NULL;
}
Modified: team/dlee/stasis-cache-split/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/devicestate.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/devicestate.c (original)
+++ team/dlee/stasis-cache-split/main/devicestate.c Tue Jul 9 22:07:29 2013
@@ -196,6 +196,7 @@
struct stasis_subscription *devstate_message_sub;
static struct stasis_topic *device_state_topic_all;
+static struct stasis_cache *device_state_cache;
static struct stasis_caching_topic *device_state_topic_cached;
static struct stasis_topic_pool *device_state_topic_pool;
@@ -285,7 +286,7 @@
RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
struct ast_device_state_message *device_state;
- cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
@@ -586,7 +587,7 @@
ast_devstate_aggregate_init(&aggregate);
- cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+ cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
@@ -598,7 +599,7 @@
RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
struct ast_device_state_message *cached_aggregate_device_state;
- cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
if (!cached_aggregate_msg) {
return 1;
}
@@ -719,6 +720,11 @@
return device_state_topic_all;
}
+struct stasis_cache *ast_device_state_cache(void)
+{
+ return device_state_cache;
+}
+
struct stasis_caching_topic *ast_device_state_topic_cached(void)
{
return device_state_topic_cached;
@@ -777,6 +783,8 @@
devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
+ ao2_cleanup(device_state_cache);
+ device_state_cache = NULL;
device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
@@ -794,7 +802,11 @@
if (!device_state_topic_all) {
return -1;
}
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id);
+ device_state_cache = stasis_cache_create(device_state_get_id);
+ if (!device_state_cache) {
+ return -1;
+ }
+ device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
if (!device_state_topic_cached) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager.c (original)
+++ team/dlee/stasis-cache-split/main/manager.c Tue Jul 9 22:07:29 2013
@@ -3871,7 +3871,7 @@
}
if (all) {
- if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(cached_channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_free(str);
astman_send_error(s, m, "Memory Allocation Failure");
return 1;
@@ -3879,7 +3879,7 @@
it_chans = ao2_iterator_init(cached_channels, 0);
msg = ao2_iterator_next(&it_chans);
} else {
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), name))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name))) {
astman_send_error(s, m, "No such channel");
ast_free(str);
return 0;
@@ -5353,7 +5353,7 @@
idText[0] = '\0';
}
- if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+ if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
astman_send_error(s, m, "Could not get cached channels");
return 0;
}
Modified: team/dlee/stasis-cache-split/main/manager_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_bridging.c (original)
+++ team/dlee/stasis-cache-split/main/manager_bridging.c Tue Jul 9 22:07:29 2013
@@ -338,7 +338,7 @@
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
- bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type());
+ bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
if (!bridges) {
astman_send_error(s, m, "Internal error");
return -1;
@@ -401,7 +401,7 @@
ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
}
- msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), bridge_uniqueid);
+ msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid);
if (!msg) {
astman_send_error(s, m, "Specified BridgeUniqueid not found");
return -1;
@@ -458,7 +458,7 @@
return -1;
}
- bridge_topic = stasis_caching_get_topic(ast_bridge_topic_all_cached());
+ bridge_topic = ast_bridge_topic_all_cached();
if (!bridge_topic) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_channels.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_channels.c (original)
+++ team/dlee/stasis-cache-split/main/manager_channels.c Tue Jul 9 22:07:29 2013
@@ -1259,7 +1259,7 @@
if (!message_router) {
return -1;
}
- channel_topic = ast_channel_cache();
+ channel_topic = ast_channel_topic_all_cached();
if (!channel_topic) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/manager_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_endpoints.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_endpoints.c (original)
+++ team/dlee/stasis-cache-split/main/manager_endpoints.c Tue Jul 9 22:07:29 2013
@@ -53,7 +53,7 @@
* topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
* to dig to make sure I don't break anything, though.
*/
- stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached), message);
+ stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
}
int manager_endpoints_init(void)
Modified: team/dlee/stasis-cache-split/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/pbx.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/pbx.c (original)
+++ team/dlee/stasis-cache-split/main/pbx.c Tue Jul 9 22:07:29 2013
@@ -8043,7 +8043,7 @@
if (a->argc != e->args + 1)
return CLI_SHOWUSAGE;
- if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+ if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
ast_cli(a->fd, "Channel '%s' not found\n", a->argv[e->args]);
return CLI_FAILURE;
}
Modified: team/dlee/stasis-cache-split/main/presencestate.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/presencestate.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/presencestate.c (original)
+++ team/dlee/stasis-cache-split/main/presencestate.c Tue Jul 9 22:07:29 2013
@@ -55,6 +55,7 @@
STASIS_MESSAGE_TYPE_DEFN(ast_presence_state_message_type);
struct stasis_topic *presence_state_topic_all;
+struct stasis_cache *presence_state_cache;
struct stasis_caching_topic *presence_state_topic_cached;
/*! \brief A presence state provider */
@@ -95,7 +96,7 @@
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_presence_state_message *presence_state;
- msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider);
+ msg = stasis_cache_get(ast_presence_state_cache(), ast_presence_state_message_type(), presence_provider);
if (!msg) {
return res;
@@ -294,9 +295,14 @@
return presence_state_topic_all;
}
-struct stasis_caching_topic *ast_presence_state_topic_cached(void)
-{
- return presence_state_topic_cached;
+struct stasis_cache *ast_presence_state_cache(void)
+{
+ return presence_state_cache;
+}
+
+struct stasis_topic *ast_presence_state_topic_cached(void)
+{
+ return stasis_caching_get_topic(presence_state_topic_cached);
}
static const char *presence_state_get_id(struct stasis_message *msg)
@@ -314,6 +320,8 @@
{
ao2_cleanup(presence_state_topic_all);
presence_state_topic_all = NULL;
+ ao2_cleanup(presence_state_cache);
+ presence_state_cache = NULL;
presence_state_topic_cached = stasis_caching_unsubscribe_and_join(presence_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_presence_state_message_type);
}
@@ -331,7 +339,12 @@
return -1;
}
- presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id);
+ presence_state_cache = stasis_cache_create(presence_state_get_id);
+ if (!presence_state_cache) {
+ return -1;
+ }
+
+ presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_cache);
if (!presence_state_topic_cached) {
return -1;
}
Modified: team/dlee/stasis-cache-split/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis.c (original)
+++ team/dlee/stasis-cache-split/main/stasis.c Tue Jul 9 22:07:29 2013
@@ -653,6 +653,11 @@
return -1;
}
+ if (stasis_wait_init() != 0) {
+ ast_log(LOG_ERROR, "Stasis initialization failed\n");
+ return -1;
+ }
+
if (pool) {
ast_log(LOG_ERROR, "Stasis double-initialized\n");
return -1;
Modified: team/dlee/stasis-cache-split/main/stasis_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_bridging.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_bridging.c Tue Jul 9 22:07:29 2013
@@ -367,15 +367,6 @@
STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
/*! @} */
-/*! \brief Aggregate topic for bridge messages */
-static struct stasis_topic *bridge_topic_all;
-
-/*! \brief Caching aggregate topic for bridge snapshots */
-static struct stasis_caching_topic *bridge_topic_all_cached;
-
-/*! \brief Topic pool for individual bridge topics */
-static struct stasis_topic_pool *bridge_topic_pool;
-
/*! \brief Destructor for bridge snapshots */
static void bridge_snapshot_dtor(void *obj)
{
@@ -420,25 +411,6 @@
return snapshot;
}
-struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
-{
- struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
- if (!bridge_topic) {
- return ast_bridge_topic_all();
- }
- return bridge_topic;
-}
-
-struct stasis_topic *ast_bridge_topic_all(void)
-{
- return bridge_topic_all;
-}
-
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
-{
- return bridge_topic_all_cached;
-}
-
void ast_bridge_publish_state(struct ast_bridge *bridge)
{
RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -459,7 +431,8 @@
stasis_publish(ast_bridge_topic(bridge), msg);
}
-static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
+ struct ast_bridge_blob *obj)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
@@ -470,7 +443,7 @@
return;
}
- stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+ stasis_publish(ast_bridge_topic(bridge), msg);
}
/*! \brief Destructor for bridge merge messages */
@@ -592,7 +565,7 @@
/* enter blob first, then state */
stasis_publish(ast_bridge_topic(bridge), msg);
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
}
void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -605,7 +578,7 @@
}
/* state first, then leave blob (opposite of enter, preserves nesting of events) */
- bridge_publish_state_from_blob(stasis_message_data(msg));
+ bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
stasis_publish(ast_bridge_topic(bridge), msg);
}
@@ -1000,7 +973,7 @@
ast_assert(!ast_strlen_zero(uniqueid));
- message = stasis_cache_get(ast_bridge_topic_all_cached(),
+ message = stasis_cache_get(ast_bridge_cache(),
ast_bridge_snapshot_type(),
uniqueid);
if (!message) {
@@ -1017,13 +990,6 @@
static void stasis_bridging_cleanup(void)
{
- ao2_cleanup(bridge_topic_all);
- bridge_topic_all = NULL;
- bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
- bridge_topic_all_cached);
- ao2_cleanup(bridge_topic_pool);
- bridge_topic_pool = NULL;
-
STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
@@ -1033,7 +999,7 @@
}
/*! \brief snapshot ID getter for caching topic */
-static const char *bridge_snapshot_get_id(struct stasis_message *msg)
+__attribute__((unused)) static const char *bridge_snapshot_get_id(struct stasis_message *msg)
{
struct ast_bridge_snapshot *snapshot;
if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
@@ -1053,11 +1019,6 @@
STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
- bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
- bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
- bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
-
- return !bridge_topic_all
- || !bridge_topic_all_cached
- || !bridge_topic_pool ? -1 : 0;
-}
+
+ return 0;
+}
Modified: team/dlee/stasis-cache-split/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_cache.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_cache.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_cache.c Tue Jul 9 22:07:29 2013
@@ -44,15 +44,18 @@
#endif
/*! \internal */
+struct stasis_cache {
+ struct ao2_container *entries;
+ snapshot_get_id id_fn;
+};
+
+/*! \internal */
struct stasis_caching_topic {
- struct ao2_container *cache;
+ struct stasis_cache *cache;
struct stasis_topic *topic;
struct stasis_topic *original_topic;
struct stasis_subscription *sub;
- snapshot_get_id id_fn;
};
-
-static struct stasis_message_type *cache_guarantee_type(void);
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
@@ -173,28 +176,80 @@
return 0;
}
-static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+static void cache_dtor(void *obj)
+{
+ struct stasis_cache *cache = obj;
+
+ ao2_cleanup(cache->entries);
+ cache->entries = NULL;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+ cache = ao2_alloc(sizeof(*cache), cache_dtor);
+ if (!cache) {
+ return NULL;
+ }
+
+ cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash,
+ cache_entry_cmp);
+ if (!cache->entries) {
+ return NULL;
+ }
+
+ cache->id_fn = id_fn;
+
+ ao2_ref(cache, +1);
+ return cache;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache,
+ struct stasis_message_type *type, const char *id)
+{
+ RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+ RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+ search_entry = cache_entry_create(type, id, NULL);
+ if (search_entry == NULL) {
+ return NULL;
+ }
+
+ cached_entry = ao2_find(cache->entries, search_entry, OBJ_POINTER);
+ if (cached_entry == NULL) {
+ return NULL;
+ }
+
+ ast_assert(cached_entry->snapshot != NULL);
+ ao2_ref(cached_entry->snapshot, +1);
+ return cached_entry->snapshot;
+}
+
+static struct stasis_message *cache_put(struct stasis_cache *cache,
+ struct stasis_message_type *type, const char *id,
+ struct stasis_message *new_snapshot)
{
RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
struct stasis_message *old_snapshot = NULL;
- ast_assert(caching_topic->cache != NULL);
+ ast_assert(cache->entries != NULL);
new_entry = cache_entry_create(type, id, new_snapshot);
if (new_snapshot == NULL) {
/* Remove entry from cache */
- cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+ cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK);
if (cached_entry) {
old_snapshot = cached_entry->snapshot;
cached_entry->snapshot = NULL;
}
} else {
/* Insert/update cache */
- SCOPED_AO2LOCK(lock, caching_topic->cache);
-
- cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+ SCOPED_AO2LOCK(lock, cache->entries);
+
+ cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_NOLOCK);
if (cached_entry) {
/* Update cache. Because objects are moving, no need to update refcounts. */
old_snapshot = cached_entry->snapshot;
@@ -202,83 +257,12 @@
new_entry->snapshot = NULL;
} else {
/* Insert into the cache */
- ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+ ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK);
}
}
return old_snapshot;
-}
-
-/*! \internal */
-struct caching_guarantee {
- ast_mutex_t lock;
- ast_cond_t cond;
- unsigned int done:1;
-};
-
-static void caching_guarantee_dtor(void *obj)
-{
- struct caching_guarantee *guarantee = obj;
-
- ast_assert(guarantee->done == 1);
-
- ast_mutex_destroy(&guarantee->lock);
- ast_cond_destroy(&guarantee->cond);
-}
-
-static struct stasis_message *caching_guarantee_create(void)
-{
- RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
- if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
- return NULL;
- }
-
- ast_mutex_init(&guarantee->lock);
- ast_cond_init(&guarantee->cond, NULL);
-
- if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
- return NULL;
- }
-
- ao2_ref(msg, +1);
- return msg;
-}
-
-struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed)
-{
- RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
- RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
-
- ast_assert(caching_topic->cache != NULL);
-
- if (guaranteed) {
- RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup);
- struct caching_guarantee *guarantee = stasis_message_data(msg);
-
- ast_mutex_lock(&guarantee->lock);
- stasis_publish(caching_topic->original_topic, msg);
- while (!guarantee->done) {
- ast_cond_wait(&guarantee->cond, &guarantee->lock);
- }
- ast_mutex_unlock(&guarantee->lock);
- }
-
- search_entry = cache_entry_create(type, id, NULL);
- if (search_entry == NULL) {
- return NULL;
- }
-
- cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
- if (cached_entry == NULL) {
- return NULL;
- }
-
- ast_assert(cached_entry->snapshot != NULL);
- ao2_ref(cached_entry->snapshot, +1);
- return cached_entry->snapshot;
}
struct cache_dump_data {
@@ -298,11 +282,11 @@
return 0;
}
-struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
{
struct cache_dump_data cache_dump;
- ast_assert(caching_topic->cache != NULL);
+ ast_assert(cache->entries != NULL);
cache_dump.type = type;
cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
@@ -310,13 +294,12 @@
return NULL;
}
- ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
+ ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
return cache_dump.cached;
}
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
-STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
{
@@ -390,22 +373,10 @@
const char *id = NULL;
ast_assert(caching_topic->topic != NULL);
- ast_assert(caching_topic->id_fn != NULL);
+ ast_assert(caching_topic->cache->id_fn != NULL);
if (stasis_subscription_final_message(sub, message)) {
caching_topic_needs_unref = caching_topic;
- }
-
- /* Handle cache guarantee event */
- if (cache_guarantee_type() == stasis_message_type(message)) {
- struct caching_guarantee *guarantee = stasis_message_data(message);
-
- ast_mutex_lock(&guarantee->lock);
- guarantee->done = 1;
- ast_cond_signal(&guarantee->cond);
- ast_mutex_unlock(&guarantee->lock);
-
- return;
}
/* Handle cache clear event */
@@ -413,13 +384,13 @@
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
struct stasis_message *clear_msg = stasis_message_data(message);
- const char *clear_id = caching_topic->id_fn(clear_msg);
+ const char *clear_id = caching_topic->cache->id_fn(clear_msg);
struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
ast_assert(clear_type != NULL);
if (clear_id) {
- old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
+ old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
update = update_create(topic, old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
@@ -433,7 +404,7 @@
}
}
- id = caching_topic->id_fn(message);
+ id = caching_topic->cache->id_fn(message);
if (id == NULL) {
/* Object isn't cached; forward */
stasis_forward_message(caching_topic->topic, topic, message);
@@ -442,7 +413,7 @@
RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
- old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+ old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
update = update_create(topic, old_snapshot, message);
if (update == NULL) {
@@ -457,7 +428,8 @@
}
}
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+struct stasis_caching_topic *stasis_caching_topic_create(
+ struct stasis_topic *original_topic, struct stasis_cache *cache)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
struct stasis_subscription *sub;
@@ -474,18 +446,13 @@
return NULL;
}
- caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
- if (!caching_topic->cache) {
- ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
- return NULL;
- }
+ ao2_ref(cache, +1);
+ caching_topic->cache = cache;
caching_topic->topic = stasis_topic_create(new_name);
if (caching_topic->topic == NULL) {
return NULL;
}
-
- caching_topic->id_fn = id_fn;
sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
if (sub == NULL) {
@@ -507,7 +474,6 @@
{
STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
- STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
}
int stasis_cache_init(void)
@@ -522,10 +488,6 @@
return -1;
}
- if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
- return -1;
- }
-
return 0;
}
Modified: team/dlee/stasis-cache-split/main/stasis_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_channels.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_channels.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_channels.c Tue Jul 9 22:07:29 2013
@@ -59,26 +59,7 @@
#define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
-/*! \brief Topic for all channels */
-struct stasis_topic *channel_topic_all;
-
-/*! \brief Caching topic for all channels */
-struct stasis_caching_topic *channel_topic_all_cached;
-
-/*! \brief Caching topic for all channels indexed by name */
-struct stasis_caching_topic *channel_topic_all_cached_by_name;
-
-struct stasis_topic *ast_channel_topic_all(void)
-{
- return channel_topic_all;
-}
-
-struct stasis_caching_topic *ast_channel_topic_all_cached(void)
-{
- return channel_topic_all_cached;
-}
-
-static const char *channel_snapshot_get_id(struct stasis_message *message)
+__attribute__((unused)) static const char *channel_snapshot_get_id(struct stasis_message *message)
{
struct ast_channel_snapshot *snapshot;
if (ast_channel_snapshot_type() != stasis_message_type(message)) {
@@ -88,12 +69,7 @@
return snapshot->uniqueid;
}
-struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void)
-{
- return channel_topic_all_cached_by_name;
-}
-
-static const char *channel_snapshot_get_name(struct stasis_message *message)
+__attribute__((unused)) static const char *channel_snapshot_get_name(struct stasis_message *message)
{
struct ast_channel_snapshot *snapshot;
if (ast_channel_snapshot_type() != stasis_message_type(message)) {
@@ -431,7 +407,7 @@
ast_assert(!ast_strlen_zero(uniqueid));
- message = stasis_cache_get(ast_channel_topic_all_cached(),
+ message = stasis_cache_get(ast_channel_cache(),
ast_channel_snapshot_type(),
uniqueid);
if (!message) {
@@ -453,7 +429,7 @@
ast_assert(!ast_strlen_zero(name));
- message = stasis_cache_get(ast_channel_topic_all_cached_by_name(),
+ message = stasis_cache_get(ast_channel_cache_by_name(),
ast_channel_snapshot_type(),
name);
if (!message) {
@@ -832,10 +808,6 @@
static void stasis_channels_cleanup(void)
{
- channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
- channel_topic_all_cached_by_name = stasis_caching_unsubscribe_and_join(channel_topic_all_cached_by_name);
- ao2_cleanup(channel_topic_all);
- channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
[... 260 lines stripped ...]
More information about the svn-commits
mailing list