[svn-commits] dlee: branch dlee/stasis-cache-split r393967 - in /team/dlee/stasis-cache-spl...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Tue Jul 9 22:07:33 CDT 2013


Author: dlee
Date: Tue Jul  9 22:07:29 2013
New Revision: 393967

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=393967
Log:
Compiles; although linking is a bit of a problem

Added:
    team/dlee/stasis-cache-split/main/stasis_wait.c   (with props)
Modified:
    team/dlee/stasis-cache-split/include/asterisk/devicestate.h
    team/dlee/stasis-cache-split/include/asterisk/presencestate.h
    team/dlee/stasis-cache-split/include/asterisk/stasis.h
    team/dlee/stasis-cache-split/main/app.c
    team/dlee/stasis-cache-split/main/bridging.c
    team/dlee/stasis-cache-split/main/cdr.c
    team/dlee/stasis-cache-split/main/cel.c
    team/dlee/stasis-cache-split/main/cli.c
    team/dlee/stasis-cache-split/main/devicestate.c
    team/dlee/stasis-cache-split/main/manager.c
    team/dlee/stasis-cache-split/main/manager_bridging.c
    team/dlee/stasis-cache-split/main/manager_channels.c
    team/dlee/stasis-cache-split/main/manager_endpoints.c
    team/dlee/stasis-cache-split/main/pbx.c
    team/dlee/stasis-cache-split/main/presencestate.c
    team/dlee/stasis-cache-split/main/stasis.c
    team/dlee/stasis-cache-split/main/stasis_bridging.c
    team/dlee/stasis-cache-split/main/stasis_cache.c
    team/dlee/stasis-cache-split/main/stasis_channels.c
    team/dlee/stasis-cache-split/main/stasis_endpoints.c
    team/dlee/stasis-cache-split/tests/test_cel.c

Modified: team/dlee/stasis-cache-split/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/devicestate.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/devicestate.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/devicestate.h Tue Jul  9 22:07:29 2013
@@ -310,6 +310,13 @@
 struct stasis_caching_topic *ast_device_state_topic_cached(void);
 
 /*!
+ * \brief Backend cache for ast_device_state_topic_cached()
+ * \retval Cache of \ref ast_device_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_device_state_cache(void);
+
+/*!
  * \brief Get the Stasis message type for device state messages
  * \retval The message type for device state messages
  * \retval NULL if it has not been allocated

Modified: team/dlee/stasis-cache-split/include/asterisk/presencestate.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/presencestate.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/presencestate.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/presencestate.h Tue Jul  9 22:07:29 2013
@@ -168,7 +168,14 @@
  * \retval Caching Stasis topic for presence state messages
  * \since 12
  */
-struct stasis_caching_topic *ast_presence_state_topic_cached(void);
+struct stasis_topic *ast_presence_state_topic_cached(void);
+
+/*!
+ * \brief Backend cache for ast_presence_state_topic_cached()
+ * \retval Cache of \ref ast_presence_state_message.
+ * \since 12
+ */
+struct stasis_cache *ast_presence_state_cache(void);
 
 /*!
  * \brief Stasis message payload representing a presence state update

Modified: team/dlee/stasis-cache-split/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/include/asterisk/stasis.h?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/include/asterisk/stasis.h (original)
+++ team/dlee/stasis-cache-split/include/asterisk/stasis.h Tue Jul  9 22:07:29 2013
@@ -345,6 +345,15 @@
 			    struct stasis_topic *publisher_topic,
 			    struct stasis_message *message);
 
+/*!
+ * \brief Wait for all pending messages on a given topic to be processed.
+ * \param topic Topic to await pending messages on.
+ * \return 0 on success.
+ * \return Non-zero on error.
+ * \since 12
+ */
+int stasis_topic_wait(struct stasis_topic *topic);
+
 /*! @} */
 
 /*! @{ */
@@ -716,24 +725,9 @@
  * \return \c NULL if message is not found.
  * \since 12
  */
-#define stasis_cache_get(cache, type, id)		\
-	stasis_cache_get_extended(cache, type, id, 0)
-
-/*!
- * \brief Retrieve an item from the cache.
- * \param cache The cache to query.
- * \param type Type of message to retrieve.
- * \param id Identity of the snapshot to retrieve.
- * \param guaranteed If set to 1 it is guaranteed that any pending messages
- *                   have been processed.
- * \return Message from the cache. The cache still owns the message, so
- *         ao2_ref() if you want to keep it.
- * \return \c NULL if message is not found.
- * \since 12
- */
-struct stasis_message *stasis_cache_get_extended(
+struct stasis_message *stasis_cache_get(
 	struct stasis_cache *cache, struct stasis_message_type *type,
-	const char *id, unsigned int guaranteed);
+	const char *id);
 
 /*!
  * \brief Dump cached items to a subscription
@@ -878,12 +872,17 @@
 
 /*!
  * \internal
- * \brief called by stasis_init for config initialization.
+ * \brief called by stasis_init() for config initialization.
  * \return 0 on success.
  * \return Non-zero on error.
  * \since 12
  */
 int stasis_config_init(void);
+
+/*!
+ * \internal
+ */
+int stasis_wait_init(void);
 
 struct ast_threadpool_options;
 

Modified: team/dlee/stasis-cache-split/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/app.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/app.c (original)
+++ team/dlee/stasis-cache-split/main/app.c Tue Jul  9 22:07:29 2013
@@ -88,6 +88,7 @@
  * @{ \brief Define \ref stasis topic objects for MWI
  */
 static struct stasis_topic *mwi_topic_all;
+static struct stasis_cache *mwi_state_cache;
 static struct stasis_caching_topic *mwi_topic_cached;
 static struct stasis_topic_pool *mwi_topic_pool;
 /* @} */
@@ -2696,6 +2697,11 @@
 	return mwi_topic_all;
 }
 
+struct stasis_cache *ast_mwi_state_cache(void)
+{
+	return mwi_state_cache;
+}
+
 struct stasis_topic *ast_mwi_topic_cached(void)
 {
 	return stasis_caching_get_topic(mwi_topic_cached);
@@ -2754,7 +2760,7 @@
 
 	if (!ast_strlen_zero(channel_id)) {
 		RAII_VAR(struct stasis_message *, chan_message,
-			stasis_cache_get(ast_channel_topic_all_cached(),
+			stasis_cache_get(ast_channel_cache(),
 					ast_channel_snapshot_type(),
 					channel_id),
 			ao2_cleanup);
@@ -2855,7 +2861,11 @@
 	if (!mwi_topic_all) {
 		return -1;
 	}
-	mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+	mwi_state_cache = stasis_cache_create(mwi_state_get_id);
+	if (!mwi_state_cache) {
+		return -1;
+	}
+	mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
 	if (!mwi_topic_cached) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/bridging.c (original)
+++ team/dlee/stasis-cache-split/main/bridging.c Tue Jul  9 22:07:29 2013
@@ -6893,7 +6893,7 @@
 	struct ao2_iterator iter;
 	struct stasis_message *msg;
 
-	if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+	if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
 		return NULL;
 	}
 
@@ -6931,7 +6931,7 @@
 		return NULL;
 	}
 
-	if (!(cached_bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type()))) {
+	if (!(cached_bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type()))) {
 		ast_cli(a->fd, "Failed to retrieve cached bridges\n");
 		return CLI_SUCCESS;
 	}
@@ -6963,7 +6963,7 @@
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	struct ast_channel_snapshot *snapshot;
 
-	if (!(msg = stasis_cache_get(ast_channel_topic_all_cached(), ast_channel_snapshot_type(), uniqueid))) {
+	if (!(msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid))) {
 		return 0;
 	}
 	snapshot = stasis_message_data(msg);
@@ -6996,7 +6996,7 @@
 		return CLI_SHOWUSAGE;
 	}
 
-	msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), a->argv[2]);
+	msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), a->argv[2]);
 	if (!msg) {
 		ast_cli(a->fd, "Bridge '%s' not found\n", a->argv[2]);
 		return CLI_SUCCESS;

Modified: team/dlee/stasis-cache-split/main/cdr.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cdr.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cdr.c (original)
+++ team/dlee/stasis-cache-split/main/cdr.c Tue Jul  9 22:07:29 2013
@@ -4026,11 +4026,11 @@
 		return -1;
 	}
 
-	channel_subscription = stasis_forward_all(ast_channel_cache(), cdr_topic);
+	channel_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
 	if (!channel_subscription) {
 		return -1;
 	}
-	bridge_subscription = stasis_forward_all(stasis_caching_get_topic(ast_bridge_topic_all_cached()), cdr_topic);
+	bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic);
 	if (!bridge_subscription) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/cel.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cel.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cel.c (original)
+++ team/dlee/stasis-cache-split/main/cel.c Tue Jul  9 22:07:29 2013
@@ -1463,14 +1463,14 @@
 	}
 
 	cel_channel_forwarder = stasis_forward_all(
-		ast_channel_cache(),
+		ast_channel_topic_all_cached(),
 		cel_aggregation_topic);
 	if (!cel_channel_forwarder) {
 		return -1;
 	}
 
 	cel_bridge_forwarder = stasis_forward_all(
-		stasis_caching_get_topic(ast_bridge_topic_all_cached()),
+		ast_bridge_topic_all_cached(),
 		cel_aggregation_topic);
 	if (!cel_bridge_forwarder) {
 		return -1;

Modified: team/dlee/stasis-cache-split/main/cli.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/cli.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/cli.c (original)
+++ team/dlee/stasis-cache-split/main/cli.c Tue Jul  9 22:07:29 2013
@@ -915,7 +915,7 @@
 		return CLI_SHOWUSAGE;
 
 
-	if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+	if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
 		ast_cli(a->fd, "Failed to retrieve cached channels\n");
 		return CLI_SUCCESS;
 	}
@@ -1438,7 +1438,7 @@
 
 	now = ast_tvnow();
 
-	if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+	if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
 		ast_cli(a->fd, "%s is not a known channel\n", a->argv[3]);
 		return CLI_SUCCESS;
 	}
@@ -1571,7 +1571,7 @@
 		return NULL;
 	}
 
-	if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached(), ast_channel_snapshot_type()))) {
+	if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) {
 		return NULL;
 	}
 

Modified: team/dlee/stasis-cache-split/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/devicestate.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/devicestate.c (original)
+++ team/dlee/stasis-cache-split/main/devicestate.c Tue Jul  9 22:07:29 2013
@@ -196,6 +196,7 @@
 struct stasis_subscription *devstate_message_sub;
 
 static struct stasis_topic *device_state_topic_all;
+static struct stasis_cache *device_state_cache;
 static struct stasis_caching_topic *device_state_topic_cached;
 static struct stasis_topic_pool *device_state_topic_pool;
 
@@ -285,7 +286,7 @@
 	RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
 	struct ast_device_state_message *device_state;
 
-	cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+	cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
 	if (!cached_msg) {
 		return AST_DEVICE_UNKNOWN;
 	}
@@ -586,7 +587,7 @@
 
 	ast_devstate_aggregate_init(&aggregate);
 
-	cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+	cached = stasis_cache_dump(ast_device_state_cache(), NULL);
 
 	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
 
@@ -598,7 +599,7 @@
 	RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
 	struct ast_device_state_message *cached_aggregate_device_state;
 
-	cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+	cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
 	if (!cached_aggregate_msg) {
 		return 1;
 	}
@@ -719,6 +720,11 @@
 	return device_state_topic_all;
 }
 
+struct stasis_cache *ast_device_state_cache(void)
+{
+	return device_state_cache;
+}
+
 struct stasis_caching_topic *ast_device_state_topic_cached(void)
 {
 	return device_state_topic_cached;
@@ -777,6 +783,8 @@
 	devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
 	ao2_cleanup(device_state_topic_all);
 	device_state_topic_all = NULL;
+	ao2_cleanup(device_state_cache);
+	device_state_cache = NULL;
 	device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
 	ao2_cleanup(device_state_topic_pool);
@@ -794,7 +802,11 @@
 	if (!device_state_topic_all) {
 		return -1;
 	}
-	device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id);
+	device_state_cache = stasis_cache_create(device_state_get_id);
+	if (!device_state_cache) {
+		return -1;
+	}
+	device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
 	if (!device_state_topic_cached) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/manager.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager.c (original)
+++ team/dlee/stasis-cache-split/main/manager.c Tue Jul  9 22:07:29 2013
@@ -3871,7 +3871,7 @@
 	}
 
 	if (all) {
-		if (!(cached_channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+		if (!(cached_channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
 			ast_free(str);
 			astman_send_error(s, m, "Memory Allocation Failure");
 			return 1;
@@ -3879,7 +3879,7 @@
 		it_chans = ao2_iterator_init(cached_channels, 0);
 		msg = ao2_iterator_next(&it_chans);
 	} else {
-		if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), name))) {
+		if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), name))) {
 			astman_send_error(s, m, "No such channel");
 			ast_free(str);
 			return 0;
@@ -5353,7 +5353,7 @@
 		idText[0] = '\0';
 	}
 
-	if (!(channels = stasis_cache_dump(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type()))) {
+	if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
 		astman_send_error(s, m, "Could not get cached channels");
 		return 0;
 	}

Modified: team/dlee/stasis-cache-split/main/manager_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_bridging.c (original)
+++ team/dlee/stasis-cache-split/main/manager_bridging.c Tue Jul  9 22:07:29 2013
@@ -338,7 +338,7 @@
 		ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
 	}
 
-	bridges = stasis_cache_dump(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type());
+	bridges = stasis_cache_dump(ast_bridge_cache(), ast_bridge_snapshot_type());
 	if (!bridges) {
 		astman_send_error(s, m, "Internal error");
 		return -1;
@@ -401,7 +401,7 @@
 		ast_str_set(&id_text, 0, "ActionID: %s\r\n", id);
 	}
 
-	msg = stasis_cache_get(ast_bridge_topic_all_cached(), ast_bridge_snapshot_type(), bridge_uniqueid);
+	msg = stasis_cache_get(ast_bridge_cache(), ast_bridge_snapshot_type(), bridge_uniqueid);
 	if (!msg) {
 		astman_send_error(s, m, "Specified BridgeUniqueid not found");
 		return -1;
@@ -458,7 +458,7 @@
 		return -1;
 	}
 
-	bridge_topic = stasis_caching_get_topic(ast_bridge_topic_all_cached());
+	bridge_topic = ast_bridge_topic_all_cached();
 	if (!bridge_topic) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/manager_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_channels.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_channels.c (original)
+++ team/dlee/stasis-cache-split/main/manager_channels.c Tue Jul  9 22:07:29 2013
@@ -1259,7 +1259,7 @@
 	if (!message_router) {
 		return -1;
 	}
-	channel_topic = ast_channel_cache();
+	channel_topic = ast_channel_topic_all_cached();
 	if (!channel_topic) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/manager_endpoints.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/manager_endpoints.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/manager_endpoints.c (original)
+++ team/dlee/stasis-cache-split/main/manager_endpoints.c Tue Jul  9 22:07:29 2013
@@ -53,7 +53,7 @@
 	 * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
 	 * to dig to make sure I don't break anything, though.
 	 */
-	stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached), message);
+	stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
 }
 
 int manager_endpoints_init(void)

Modified: team/dlee/stasis-cache-split/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/pbx.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/pbx.c (original)
+++ team/dlee/stasis-cache-split/main/pbx.c Tue Jul  9 22:07:29 2013
@@ -8043,7 +8043,7 @@
 	if (a->argc != e->args + 1)
 		return CLI_SHOWUSAGE;
 
-	if (!(msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
+	if (!(msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), a->argv[3]))) {
 		ast_cli(a->fd, "Channel '%s' not found\n", a->argv[e->args]);
 		return CLI_FAILURE;
 	}

Modified: team/dlee/stasis-cache-split/main/presencestate.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/presencestate.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/presencestate.c (original)
+++ team/dlee/stasis-cache-split/main/presencestate.c Tue Jul  9 22:07:29 2013
@@ -55,6 +55,7 @@
 
 STASIS_MESSAGE_TYPE_DEFN(ast_presence_state_message_type);
 struct stasis_topic *presence_state_topic_all;
+struct stasis_cache *presence_state_cache;
 struct stasis_caching_topic *presence_state_topic_cached;
 
 /*! \brief  A presence state provider */
@@ -95,7 +96,7 @@
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	struct ast_presence_state_message *presence_state;
 
-	msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider);
+	msg = stasis_cache_get(ast_presence_state_cache(), ast_presence_state_message_type(), presence_provider);
 
 	if (!msg) {
 		return res;
@@ -294,9 +295,14 @@
 	return presence_state_topic_all;
 }
 
-struct stasis_caching_topic *ast_presence_state_topic_cached(void)
-{
-	return presence_state_topic_cached;
+struct stasis_cache *ast_presence_state_cache(void)
+{
+	return presence_state_cache;
+}
+
+struct stasis_topic *ast_presence_state_topic_cached(void)
+{
+	return stasis_caching_get_topic(presence_state_topic_cached);
 }
 
 static const char *presence_state_get_id(struct stasis_message *msg)
@@ -314,6 +320,8 @@
 {
 	ao2_cleanup(presence_state_topic_all);
 	presence_state_topic_all = NULL;
+	ao2_cleanup(presence_state_cache);
+	presence_state_cache = NULL;
 	presence_state_topic_cached = stasis_caching_unsubscribe_and_join(presence_state_topic_cached);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_presence_state_message_type);
 }
@@ -331,7 +339,12 @@
 		return -1;
 	}
 
-	presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id);
+	presence_state_cache = stasis_cache_create(presence_state_get_id);
+	if (!presence_state_cache) {
+		return -1;
+	}
+
+	presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_cache);
 	if (!presence_state_topic_cached) {
 		return -1;
 	}

Modified: team/dlee/stasis-cache-split/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis.c (original)
+++ team/dlee/stasis-cache-split/main/stasis.c Tue Jul  9 22:07:29 2013
@@ -653,6 +653,11 @@
 		return -1;
 	}
 
+	if (stasis_wait_init() != 0) {
+		ast_log(LOG_ERROR, "Stasis initialization failed\n");
+		return -1;
+	}
+
 	if (pool) {
 		ast_log(LOG_ERROR, "Stasis double-initialized\n");
 		return -1;

Modified: team/dlee/stasis-cache-split/main/stasis_bridging.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_bridging.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_bridging.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_bridging.c Tue Jul  9 22:07:29 2013
@@ -367,15 +367,6 @@
 STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami);
 /*! @} */
 
-/*! \brief Aggregate topic for bridge messages */
-static struct stasis_topic *bridge_topic_all;
-
-/*! \brief Caching aggregate topic for bridge snapshots */
-static struct stasis_caching_topic *bridge_topic_all_cached;
-
-/*! \brief Topic pool for individual bridge topics */
-static struct stasis_topic_pool *bridge_topic_pool;
-
 /*! \brief Destructor for bridge snapshots */
 static void bridge_snapshot_dtor(void *obj)
 {
@@ -420,25 +411,6 @@
 	return snapshot;
 }
 
-struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
-{
-	struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
-	if (!bridge_topic) {
-		return ast_bridge_topic_all();
-	}
-	return bridge_topic;
-}
-
-struct stasis_topic *ast_bridge_topic_all(void)
-{
-	return bridge_topic_all;
-}
-
-struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
-{
-	return bridge_topic_all_cached;
-}
-
 void ast_bridge_publish_state(struct ast_bridge *bridge)
 {
 	RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -459,7 +431,8 @@
 	stasis_publish(ast_bridge_topic(bridge), msg);
 }
 
-static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
+static void bridge_publish_state_from_blob(struct ast_bridge *bridge,
+	struct ast_bridge_blob *obj)
 {
 	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
@@ -470,7 +443,7 @@
 		return;
 	}
 
-	stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
+	stasis_publish(ast_bridge_topic(bridge), msg);
 }
 
 /*! \brief Destructor for bridge merge messages */
@@ -592,7 +565,7 @@
 
 	/* enter blob first, then state */
 	stasis_publish(ast_bridge_topic(bridge), msg);
-	bridge_publish_state_from_blob(stasis_message_data(msg));
+	bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
 }
 
 void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
@@ -605,7 +578,7 @@
 	}
 
 	/* state first, then leave blob (opposite of enter, preserves nesting of events) */
-	bridge_publish_state_from_blob(stasis_message_data(msg));
+	bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
 	stasis_publish(ast_bridge_topic(bridge), msg);
 }
 
@@ -1000,7 +973,7 @@
 
 	ast_assert(!ast_strlen_zero(uniqueid));
 
-	message = stasis_cache_get(ast_bridge_topic_all_cached(),
+	message = stasis_cache_get(ast_bridge_cache(),
 			ast_bridge_snapshot_type(),
 			uniqueid);
 	if (!message) {
@@ -1017,13 +990,6 @@
 
 static void stasis_bridging_cleanup(void)
 {
-	ao2_cleanup(bridge_topic_all);
-	bridge_topic_all = NULL;
-	bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
-		bridge_topic_all_cached);
-	ao2_cleanup(bridge_topic_pool);
-	bridge_topic_pool = NULL;
-
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
@@ -1033,7 +999,7 @@
 }
 
 /*! \brief snapshot ID getter for caching topic */
-static const char *bridge_snapshot_get_id(struct stasis_message *msg)
+__attribute__((unused)) static const char *bridge_snapshot_get_id(struct stasis_message *msg)
 {
 	struct ast_bridge_snapshot *snapshot;
 	if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
@@ -1053,11 +1019,6 @@
 	STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
 	STASIS_MESSAGE_TYPE_INIT(ast_blind_transfer_type);
 	STASIS_MESSAGE_TYPE_INIT(ast_attended_transfer_type);
-	bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
-	bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
-	bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
-
-	return !bridge_topic_all
-		|| !bridge_topic_all_cached
-		|| !bridge_topic_pool ? -1 : 0;
-}
+
+	return 0;
+}

Modified: team/dlee/stasis-cache-split/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_cache.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_cache.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_cache.c Tue Jul  9 22:07:29 2013
@@ -44,15 +44,18 @@
 #endif
 
 /*! \internal */
+struct stasis_cache {
+	struct ao2_container *entries;
+	snapshot_get_id id_fn;
+};
+
+/*! \internal */
 struct stasis_caching_topic {
-	struct ao2_container *cache;
+	struct stasis_cache *cache;
 	struct stasis_topic *topic;
 	struct stasis_topic *original_topic;
 	struct stasis_subscription *sub;
-	snapshot_get_id id_fn;
 };
-
-static struct stasis_message_type *cache_guarantee_type(void);
 
 static void stasis_caching_topic_dtor(void *obj) {
 	struct stasis_caching_topic *caching_topic = obj;
@@ -173,28 +176,80 @@
 	return 0;
 }
 
-static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
+static void cache_dtor(void *obj)
+{
+	struct stasis_cache *cache = obj;
+
+	ao2_cleanup(cache->entries);
+	cache->entries = NULL;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
+
+	cache = ao2_alloc(sizeof(*cache), cache_dtor);
+	if (!cache) {
+		return NULL;
+	}
+
+	cache->entries = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash,
+		cache_entry_cmp);
+	if (!cache->entries) {
+		return NULL;
+	}
+
+	cache->id_fn = id_fn;
+
+	ao2_ref(cache, +1);
+	return cache;
+}
+
+struct stasis_message *stasis_cache_get(struct stasis_cache *cache,
+	struct stasis_message_type *type, const char *id)
+{
+	RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
+	RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
+
+	search_entry = cache_entry_create(type, id, NULL);
+	if (search_entry == NULL) {
+		return NULL;
+	}
+
+	cached_entry = ao2_find(cache->entries, search_entry, OBJ_POINTER);
+	if (cached_entry == NULL) {
+		return NULL;
+	}
+
+	ast_assert(cached_entry->snapshot != NULL);
+	ao2_ref(cached_entry->snapshot, +1);
+	return cached_entry->snapshot;
+}
+
+static struct stasis_message *cache_put(struct stasis_cache *cache,
+	struct stasis_message_type *type, const char *id,
+	struct stasis_message *new_snapshot)
 {
 	RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
 	RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
 	struct stasis_message *old_snapshot = NULL;
 
-	ast_assert(caching_topic->cache != NULL);
+	ast_assert(cache->entries != NULL);
 
 	new_entry = cache_entry_create(type, id, new_snapshot);
 
 	if (new_snapshot == NULL) {
 		/* Remove entry from cache */
-		cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
+		cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_UNLINK);
 		if (cached_entry) {
 			old_snapshot = cached_entry->snapshot;
 			cached_entry->snapshot = NULL;
 		}
 	} else {
 		/* Insert/update cache */
-		SCOPED_AO2LOCK(lock, caching_topic->cache);
-
-		cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
+		SCOPED_AO2LOCK(lock, cache->entries);
+
+		cached_entry = ao2_find(cache->entries, new_entry, OBJ_POINTER | OBJ_NOLOCK);
 		if (cached_entry) {
 			/* Update cache. Because objects are moving, no need to update refcounts. */
 			old_snapshot = cached_entry->snapshot;
@@ -202,83 +257,12 @@
 			new_entry->snapshot = NULL;
 		} else {
 			/* Insert into the cache */
-			ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
+			ao2_link_flags(cache->entries, new_entry, OBJ_NOLOCK);
 		}
 
 	}
 
 	return old_snapshot;
-}
-
-/*! \internal */
-struct caching_guarantee {
-	ast_mutex_t lock;
-	ast_cond_t cond;
-	unsigned int done:1;
-};
-
-static void caching_guarantee_dtor(void *obj)
-{
-	struct caching_guarantee *guarantee = obj;
-
-	ast_assert(guarantee->done == 1);
-
-	ast_mutex_destroy(&guarantee->lock);
-	ast_cond_destroy(&guarantee->cond);
-}
-
-static struct stasis_message *caching_guarantee_create(void)
-{
-	RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
-	if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
-		return NULL;
-	}
-
-	ast_mutex_init(&guarantee->lock);
-	ast_cond_init(&guarantee->cond, NULL);
-
-	if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
-		return NULL;
-	}
-
-	ao2_ref(msg, +1);
-	return msg;
-}
-
-struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed)
-{
-	RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
-	RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
-
-	ast_assert(caching_topic->cache != NULL);
-
-	if (guaranteed) {
-		RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup);
-		struct caching_guarantee *guarantee = stasis_message_data(msg);
-
-		ast_mutex_lock(&guarantee->lock);
-		stasis_publish(caching_topic->original_topic, msg);
-		while (!guarantee->done) {
-			ast_cond_wait(&guarantee->cond, &guarantee->lock);
-		}
-		ast_mutex_unlock(&guarantee->lock);
-	}
-
-	search_entry = cache_entry_create(type, id, NULL);
-	if (search_entry == NULL) {
-		return NULL;
-	}
-
-	cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
-	if (cached_entry == NULL) {
-		return NULL;
-	}
-
-	ast_assert(cached_entry->snapshot != NULL);
-	ao2_ref(cached_entry->snapshot, +1);
-	return cached_entry->snapshot;
 }
 
 struct cache_dump_data {
@@ -298,11 +282,11 @@
 	return 0;
 }
 
-struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
+struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
 {
 	struct cache_dump_data cache_dump;
 
-	ast_assert(caching_topic->cache != NULL);
+	ast_assert(cache->entries != NULL);
 
 	cache_dump.type = type;
 	cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
@@ -310,13 +294,12 @@
 		return NULL;
 	}
 
-	ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
+	ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
 	return cache_dump.cached;
 }
 
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
-STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
 
 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
 {
@@ -390,22 +373,10 @@
 	const char *id = NULL;
 
 	ast_assert(caching_topic->topic != NULL);
-	ast_assert(caching_topic->id_fn != NULL);
+	ast_assert(caching_topic->cache->id_fn != NULL);
 
 	if (stasis_subscription_final_message(sub, message)) {
 		caching_topic_needs_unref = caching_topic;
-	}
-
-	/* Handle cache guarantee event */
-	if (cache_guarantee_type() == stasis_message_type(message)) {
-		struct caching_guarantee *guarantee = stasis_message_data(message);
-
-		ast_mutex_lock(&guarantee->lock);
-		guarantee->done = 1;
-		ast_cond_signal(&guarantee->cond);
-		ast_mutex_unlock(&guarantee->lock);
-
-		return;
 	}
 
 	/* Handle cache clear event */
@@ -413,13 +384,13 @@
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 		struct stasis_message *clear_msg = stasis_message_data(message);
-		const char *clear_id = caching_topic->id_fn(clear_msg);
+		const char *clear_id = caching_topic->cache->id_fn(clear_msg);
 		struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
 
 		ast_assert(clear_type != NULL);
 
 		if (clear_id) {
-			old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
+			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
 			if (old_snapshot) {
 				update = update_create(topic, old_snapshot, NULL);
 				stasis_publish(caching_topic->topic, update);
@@ -433,7 +404,7 @@
 		}
 	}
 
-	id = caching_topic->id_fn(message);
+	id = caching_topic->cache->id_fn(message);
 	if (id == NULL) {
 		/* Object isn't cached; forward */
 		stasis_forward_message(caching_topic->topic, topic, message);
@@ -442,7 +413,7 @@
 		RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
 		RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
 
-		old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
+		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
 
 		update = update_create(topic, old_snapshot, message);
 		if (update == NULL) {
@@ -457,7 +428,8 @@
 	}
 }
 
-struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
+struct stasis_caching_topic *stasis_caching_topic_create(
+	struct stasis_topic *original_topic, struct stasis_cache *cache)
 {
 	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
 	struct stasis_subscription *sub;
@@ -474,18 +446,13 @@
 		return NULL;
 	}
 
-	caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
-	if (!caching_topic->cache) {
-		ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
-		return NULL;
-	}
+	ao2_ref(cache, +1);
+	caching_topic->cache = cache;
 
 	caching_topic->topic = stasis_topic_create(new_name);
 	if (caching_topic->topic == NULL) {
 		return NULL;
 	}
-
-	caching_topic->id_fn = id_fn;
 
 	sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
 	if (sub == NULL) {
@@ -507,7 +474,6 @@
 {
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
-	STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
 }
 
 int stasis_cache_init(void)
@@ -522,10 +488,6 @@
 		return -1;
 	}
 
-	if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
-		return -1;
-	}
-
 	return 0;
 }
 

Modified: team/dlee/stasis-cache-split/main/stasis_channels.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-cache-split/main/stasis_channels.c?view=diff&rev=393967&r1=393966&r2=393967
==============================================================================
--- team/dlee/stasis-cache-split/main/stasis_channels.c (original)
+++ team/dlee/stasis-cache-split/main/stasis_channels.c Tue Jul  9 22:07:29 2013
@@ -59,26 +59,7 @@
 
 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
 
-/*! \brief Topic for all channels */
-struct stasis_topic *channel_topic_all;
-
-/*! \brief Caching topic for all channels */
-struct stasis_caching_topic *channel_topic_all_cached;
-
-/*! \brief Caching topic for all channels indexed by name */
-struct stasis_caching_topic *channel_topic_all_cached_by_name;
-
-struct stasis_topic *ast_channel_topic_all(void)
-{
-	return channel_topic_all;
-}
-
-struct stasis_caching_topic *ast_channel_topic_all_cached(void)
-{
-	return channel_topic_all_cached;
-}
-
-static const char *channel_snapshot_get_id(struct stasis_message *message)
+__attribute__((unused)) static const char *channel_snapshot_get_id(struct stasis_message *message)
 {
 	struct ast_channel_snapshot *snapshot;
 	if (ast_channel_snapshot_type() != stasis_message_type(message)) {
@@ -88,12 +69,7 @@
 	return snapshot->uniqueid;
 }
 
-struct stasis_caching_topic *ast_channel_topic_all_cached_by_name(void)
-{
-	return channel_topic_all_cached_by_name;
-}
-
-static const char *channel_snapshot_get_name(struct stasis_message *message)
+__attribute__((unused)) static const char *channel_snapshot_get_name(struct stasis_message *message)
 {
 	struct ast_channel_snapshot *snapshot;
 	if (ast_channel_snapshot_type() != stasis_message_type(message)) {
@@ -431,7 +407,7 @@
 
 	ast_assert(!ast_strlen_zero(uniqueid));
 
-	message = stasis_cache_get(ast_channel_topic_all_cached(),
+	message = stasis_cache_get(ast_channel_cache(),
 			ast_channel_snapshot_type(),
 			uniqueid);
 	if (!message) {
@@ -453,7 +429,7 @@
 
 	ast_assert(!ast_strlen_zero(name));
 
-	message = stasis_cache_get(ast_channel_topic_all_cached_by_name(),
+	message = stasis_cache_get(ast_channel_cache_by_name(),
 			ast_channel_snapshot_type(),
 			name);
 	if (!message) {
@@ -832,10 +808,6 @@
 
 static void stasis_channels_cleanup(void)
 {
-	channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
-	channel_topic_all_cached_by_name = stasis_caching_unsubscribe_and_join(channel_topic_all_cached_by_name);
-	ao2_cleanup(channel_topic_all);
-	channel_topic_all = NULL;
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);

[... 260 lines stripped ...]



More information about the svn-commits mailing list