[asterisk-commits] kmoore: branch kmoore/stasis-device_state r383037 - in /team/kmoore/stasis-de...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 13 14:02:34 CDT 2013


Author: kmoore
Date: Wed Mar 13 14:02:30 2013
New Revision: 383037

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383037
Log:
Collapse the device state topics/caches into one

This makes the stasis_device_state_topic_all() topic your one stop shop
for all things device state.

Modified:
    team/kmoore/stasis-device_state/apps/app_queue.c
    team/kmoore/stasis-device_state/include/asterisk/devicestate.h
    team/kmoore/stasis-device_state/main/ccss.c
    team/kmoore/stasis-device_state/main/devicestate.c
    team/kmoore/stasis-device_state/main/pbx.c
    team/kmoore/stasis-device_state/res/res_jabber.c
    team/kmoore/stasis-device_state/res/res_xmpp.c

Modified: team/kmoore/stasis-device_state/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/apps/app_queue.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/apps/app_queue.c (original)
+++ team/kmoore/stasis-device_state/apps/app_queue.c Wed Mar 13 14:02:30 2013
@@ -1749,6 +1749,10 @@
 	}
 
 	dev_state = stasis_message_data(msg);
+	if (dev_state->eid) {
+		/* ignore non-aggregate states */
+		return;
+	}
 
 	qiter = ao2_iterator_init(queues, 0);
 	while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {

Modified: team/kmoore/stasis-device_state/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/include/asterisk/devicestate.h?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/include/asterisk/devicestate.h (original)
+++ team/kmoore/stasis-device_state/include/asterisk/devicestate.h Wed Mar 13 14:02:30 2013
@@ -280,25 +280,9 @@
 		AST_STRING_FIELD(device);	/*!< The name of the device */
 	);
 	enum ast_device_state state;		/*!< The state of the device */
-	struct ast_eid eid;			/*!< The EID of the server where this message originated */
+	struct ast_eid *eid;			/*!< The EID of the server where this message originated, NULL EID means aggregate state */
 	enum ast_devstate_cache cachable;	/*!< Flag designating the cachability of this device state */
 };
-
-/*!
- * \brief Get the Stasis topic for cluster-wide device state messages.
- * \retval The topic for device state messages with EID
- * \retval NULL if it has not been allocated
- * \since 12
- */
-struct stasis_topic *stasis_device_state_cluster_topic_all(void);
-
-/*!
- * \brief Get the Stasis caching topic for cluster-wide device state messages
- * \retval The caching topic for MWI messages
- * \retval NULL if it has not been allocated
- * \since 12
- */
-struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void);
 
 /*!
  * \brief Get the Stasis topic for aggregated device state messages
@@ -351,7 +335,7 @@
  * \since 12
  */
 #define stasis_publish_device_state(device, state, cachable) \
-	stasis_publish_device_state_full(device, state, cachable, NULL)
+	stasis_publish_device_state_full(device, state, cachable, &ast_eid_default)
 
 /*!
  * \brief Publish a device state update with EID

Modified: team/kmoore/stasis-device_state/main/ccss.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/ccss.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/ccss.c (original)
+++ team/kmoore/stasis-device_state/main/ccss.c Wed Mar 13 14:02:30 2013
@@ -1297,10 +1297,10 @@
 
 static int generic_monitor_devstate_tp_cb(void *data)
 {
-	struct stasis_device_state *gtcd = data;
-	enum ast_device_state new_state = gtcd->state;
-	enum ast_device_state previous_state = gtcd->state;
-	const char *monitor_name = gtcd->device;
+	struct stasis_device_state *dev_state = data;
+	enum ast_device_state new_state = dev_state->state;
+	enum ast_device_state previous_state = dev_state->state;
+	const char *monitor_name = dev_state->device;
 	struct generic_monitor_instance_list *generic_list;
 	struct generic_monitor_instance *generic_instance;
 
@@ -1309,14 +1309,14 @@
 		 * time between subscribing to its device state and the time this executes.
 		 * Not really a big deal.
 		 */
-		ao2_cleanup(gtcd);
+		ao2_cleanup(dev_state);
 		return 0;
 	}
 
 	if (generic_list->current_state == new_state) {
 		/* The device state hasn't actually changed, so we don't really care */
 		cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
-		ao2_cleanup(gtcd);
+		ao2_cleanup(dev_state);
 		return 0;
 	}
 
@@ -1336,7 +1336,7 @@
 		}
 	}
 	cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
-	ao2_cleanup(gtcd);
+	ao2_cleanup(dev_state);
 	return 0;
 }
 
@@ -1347,17 +1347,21 @@
 	 * so that all monitor operations can be serialized. Locks?! We don't need
 	 * no steenkin' locks!
 	 */
-	struct stasis_device_state *gtcd;
+	struct stasis_device_state *dev_state;
 	if (stasis_device_state() != stasis_message_type(msg)) {
 		return;
 	}
 
-	gtcd = stasis_message_data(msg);
-
-	ao2_ref(gtcd, +1);
-	if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) {
-		ao2_cleanup(gtcd);
-	}
+	dev_state = stasis_message_data(msg);
+	if (dev_state->eid) {
+		/* ignore non-aggregate states */
+		return;
+	}
+
+	if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, dev_state)) {
+		return;
+	}
+	ao2_ref(dev_state, +1);
 }
 
 int ast_cc_available_timer_expire(const void *data)
@@ -2623,13 +2627,19 @@
 {
 	struct ast_cc_agent *agent = userdata;
 	enum ast_device_state new_state;
-	struct stasis_device_state *dev_state = stasis_message_data(msg);
+	struct stasis_device_state *dev_state;
 	struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
 
 	if (stasis_subscription_final_message(sub, msg)) {
 		cc_unref(agent, "Done holding ref for subscription");
 		return;
 	} else if (stasis_device_state() != stasis_message_type(msg)) {
+		return;
+	}
+
+	dev_state = stasis_message_data(msg);
+	if (dev_state->eid) {
+		/* ignore non-aggregate states */
 		return;
 	}
 

Modified: team/kmoore/stasis-device_state/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/devicestate.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/devicestate.c (original)
+++ team/kmoore/stasis-device_state/main/devicestate.c Wed Mar 13 14:02:30 2013
@@ -193,8 +193,6 @@
 
 struct stasis_subscription *devstate_message_sub;
 
-static struct stasis_topic *__device_state_cluster_topic_all;
-static struct stasis_caching_topic *__device_state_cluster_topic_cached;
 static struct stasis_topic *__device_state_topic_all;
 static struct stasis_caching_topic *__device_state_topic_cached;
 static struct stasis_message_type *__device_state_message_type;
@@ -549,7 +547,8 @@
 	char *device = data;
 	struct stasis_device_state *dev_state = stasis_message_data(msg);
 
-	if (strcmp(device, dev_state->device)) {
+	if (!dev_state->eid || strcmp(device, dev_state->device)) {
+		/* ignore aggregate states and devices that don't match */
 		return 0;
 	}
 	ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
@@ -562,6 +561,7 @@
 {
 	struct stasis_device_state *device_state = obj;
 	ast_string_field_free_memory(device_state);
+	ast_free(device_state->eid);
 }
 
 static struct stasis_device_state *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cached, const struct ast_eid *eid)
@@ -575,10 +575,23 @@
 
 	ast_string_field_set(new_dev_state, device, device);
 	new_dev_state->state = state;
+
 	if (eid) {
-		new_dev_state->eid = *eid;
+		char eid_str[20];
+		struct ast_str *uniqueid = ast_str_alloca(256);
+
+		new_dev_state->eid = ast_malloc(sizeof(*eid));
+		if (!new_dev_state->eid) {
+			return NULL;
+		}
+
+		*new_dev_state->eid = *eid;
+		ast_eid_to_str(eid_str, sizeof(eid_str), new_dev_state->eid);
+		ast_str_set(&uniqueid, 0, "%s%s", eid_str, device);
+		ast_string_field_set(new_dev_state, uniqueid, ast_str_buffer(uniqueid));
 	} else {
-		ast_set_default_eid(&new_dev_state->eid);
+		/* no EID makes this an aggregate state */
+		ast_string_field_set(new_dev_state, uniqueid, device);
 	}
 
 	ao2_ref(new_dev_state, +1);
@@ -592,7 +605,7 @@
 
 	ast_devstate_aggregate_init(&agg);
 
-	cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+	cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
 
 	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
 
@@ -613,8 +626,12 @@
 
 	dev_state = stasis_message_data(msg);
 
+	if (!dev_state->eid) {
+		/* ignore aggregate messages */
+		return;
+	}
+
 	device = (char *)dev_state->device;
-
 	ast_debug(1, "Processing device state change for '%s'\n", device);
 
 	if (dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
@@ -625,7 +642,6 @@
 		RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
 
 		agg_state = get_agg_state(device);
-
 		ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
 			ast_devstate2str(agg_state), device);
 	
@@ -641,17 +657,10 @@
 		}
 	}
 
-	new_agg_state = device_state_alloc(device, agg_state, dev_state->cachable, NULL);
-
 	ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
-		device, ast_devstate2str(new_agg_state->state));
-
-	ast_string_field_set(new_agg_state, uniqueid, device);
-
-	new_agg_msg = stasis_message_create(stasis_device_state(), new_agg_state);
-
-	ast_assert(stasis_device_state_topic(device) != NULL);
-	stasis_publish(stasis_device_state_topic(device), new_agg_msg);
+		device, ast_devstate2str(agg_state));
+
+	stasis_publish_device_state_full(device, agg_state, dev_state->cachable, NULL);
 }
 
 /*! \brief Initialize the device state engine in separate thread */
@@ -704,16 +713,6 @@
 	return agg->state;
 }
 
-struct stasis_topic *stasis_device_state_cluster_topic_all(void)
-{
-	return __device_state_cluster_topic_all;
-}
-
-struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void)
-{
-	return __device_state_cluster_topic_cached;
-}
-
 struct stasis_topic *stasis_device_state_topic_all(void)
 {
 	return __device_state_topic_all;
@@ -771,8 +770,6 @@
 {
 	RAII_VAR(struct stasis_device_state *, device_state, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
-	struct ast_str *uniqueid = ast_str_alloca(256);
-	char eid_str[20];
 
 	ast_assert(!ast_strlen_zero(device));
 
@@ -780,16 +777,11 @@
 	if (!device_state) {
 		return -1;
 	}
-	ast_eid_to_str(eid_str, sizeof(eid_str), &device_state->eid);
-
-	ast_str_set(&uniqueid, 0, "%s%s", eid_str, device);
-	ast_string_field_set(device_state, uniqueid, ast_str_buffer(uniqueid));
 
 	message = stasis_message_create(stasis_device_state(), device_state);
 
-	ast_assert(stasis_device_state_topic(ast_str_buffer(uniqueid)) != NULL);
-	stasis_publish(stasis_device_state_cluster_topic_all(), message);
-
+	ast_assert(stasis_device_state_topic(device) != NULL);
+	stasis_publish(stasis_device_state_topic(device), message);
 	return 0;
 }
 
@@ -823,10 +815,6 @@
 
 static void devstate_exit(void)
 {
-	ao2_cleanup(__device_state_cluster_topic_all);
-	__device_state_cluster_topic_all = NULL;
-	stasis_caching_unsubscribe(__device_state_cluster_topic_cached);
-	__device_state_cluster_topic_cached = NULL;
 	ao2_cleanup(__device_state_topic_all);
 	__device_state_topic_all = NULL;
 	stasis_caching_unsubscribe(__device_state_topic_cached);
@@ -842,14 +830,6 @@
 int devstate_init(void)
 {
 	ast_register_atexit(devstate_exit);
-	__device_state_cluster_topic_all = stasis_topic_create("stasis_device_state_cluster_topic");
-	if (!__device_state_cluster_topic_all) {
-		return -1;
-	}
-	__device_state_cluster_topic_cached = stasis_caching_topic_create(__device_state_cluster_topic_all, device_state_get_id);
-	if (!__device_state_cluster_topic_cached) {
-		return -1;
-	}
 	__device_state_topic_all = stasis_topic_create("stasis_device_state_topic");
 	if (!__device_state_topic_all) {
 		return -1;
@@ -867,7 +847,7 @@
 		return -1;
 	}
 
-	devstate_message_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
+	devstate_message_sub = stasis_subscribe(stasis_device_state_topic_all(), devstate_change_collector_cb, NULL);
 
 	if (!devstate_message_sub) {
 		ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");

Modified: team/kmoore/stasis-device_state/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/pbx.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/pbx.c (original)
+++ team/kmoore/stasis-device_state/main/pbx.c Wed Mar 13 14:02:30 2013
@@ -5267,6 +5267,10 @@
 	}
 
 	dev_state = stasis_message_data(msg);
+	if (dev_state->eid) {
+		/* ignore non-aggregate states */
+		return;
+	}
 
 	if (ao2_container_count(hintdevices) == 0) {
 		/* There are no hints monitoring devices. */

Modified: team/kmoore/stasis-device_state/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/res/res_jabber.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/res/res_jabber.c (original)
+++ team/kmoore/stasis-device_state/res/res_jabber.c Wed Mar 13 14:02:30 2013
@@ -3284,8 +3284,8 @@
 	}
 
 	dev_state = stasis_message_data(msg);
-	if (ast_eid_cmp(&ast_eid_default, &dev_state->eid)) {
-		/* If the event didn't originate from this server, don't send it back out. */
+	if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) {
+		/* If the event is aggregate or didn't originate from this server, don't send it out. */
 		return;
 	}
 
@@ -3314,9 +3314,9 @@
 	if (!device_state_sub) {
 		RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
 		client = ASTOBJ_REF(client);
-		device_state_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(),
+		device_state_sub = stasis_subscribe(stasis_device_state_topic_all(),
 			aji_devstate_cb, client);
-		cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+		cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
 		ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
 	}
 

Modified: team/kmoore/stasis-device_state/res/res_xmpp.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/res/res_xmpp.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/res/res_xmpp.c (original)
+++ team/kmoore/stasis-device_state/res/res_xmpp.c Wed Mar 13 14:02:30 2013
@@ -1359,8 +1359,8 @@
 	}
 
 	dev_state = stasis_message_data(msg);
-	if (ast_eid_cmp(&ast_eid_default, &dev_state->eid)) {
- 		/* If the event didn't originate from this server, don't send it back out. */
+	if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) {
+ 		/* If the event is aggregate or didn't originate from this server, don't send it out. */
  		return;
  	}
  
@@ -1602,14 +1602,14 @@
 		return;
 	}
 
-	if (!(client->device_state_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), xmpp_pubsub_devstate_cb, client))) {
+	if (!(client->device_state_sub = stasis_subscribe(stasis_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
 		ast_event_unsubscribe(client->mwi_sub);
 		client->mwi_sub = NULL;
 		return;
 	}
 
 	ao2_ref(client, +1);
-	cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+	cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
 	ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
 
 	xmpp_pubsub_subscribe(client, "device_state");




More information about the asterisk-commits mailing list