[svn-commits] kmoore: branch kmoore/stasis-device_state r383007 - in /team/kmoore/stasis-de...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Mar 13 08:41:30 CDT 2013


Author: kmoore
Date: Wed Mar 13 08:41:27 2013
New Revision: 383007

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383007
Log:
Initial changeover with minor simplification of some state aggregation code

Modified:
    team/kmoore/stasis-device_state/apps/app_queue.c
    team/kmoore/stasis-device_state/include/asterisk/devicestate.h
    team/kmoore/stasis-device_state/include/asterisk/xmpp.h
    team/kmoore/stasis-device_state/main/asterisk.c
    team/kmoore/stasis-device_state/main/ccss.c
    team/kmoore/stasis-device_state/main/devicestate.c
    team/kmoore/stasis-device_state/main/pbx.c
    team/kmoore/stasis-device_state/res/res_jabber.c
    team/kmoore/stasis-device_state/res/res_xmpp.c

Modified: team/kmoore/stasis-device_state/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/apps/app_queue.c?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/apps/app_queue.c (original)
+++ team/kmoore/stasis-device_state/apps/app_queue.c Wed Mar 13 08:41:27 2013
@@ -1037,8 +1037,8 @@
 /*! \brief queues.conf [general] option */
 static int shared_lastcall = 1;
 
-/*! \brief Subscription to device state change events */
-static struct ast_event_sub *device_state_sub;
+/*! \brief Subscription to device state change messages */
+static struct stasis_subscription *device_state_sub;
 
 /*! \brief queues.conf [general] option */
 static int update_cdr = 0;
@@ -1620,8 +1620,7 @@
 
 struct statechange {
 	AST_LIST_ENTRY(statechange) entry;
-	int state;
-	char dev[0];
+	struct stasis_device_state *dev_state;
 };
 
 /*! \brief set a member's status based on device state of that member's state_interface.
@@ -1770,9 +1769,9 @@
 					}
 				}
 
-				if (!strcasecmp(interface, sc->dev)) {
+				if (!strcasecmp(interface, sc->dev_state->device)) {
 					found_member = 1;
-					update_status(q, m, sc->state);
+					update_status(q, m, sc->dev_state->state);
 				}
 			}
 
@@ -1804,36 +1803,40 @@
 	ao2_iterator_destroy(&qiter);
 
 	if (found) {
-		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state));
+		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n",
+			sc->dev_state->device,
+			sc->dev_state->state,
+			ast_devstate2str(sc->dev_state->state));
 	} else {
-		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state));
-	}
-
+		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
+			sc->dev_state->device,
+			sc->dev_state->state,
+			ast_devstate2str(sc->dev_state->state));
+	}
+
+	ao2_cleanup(sc->dev_state);
+	sc->dev_state = NULL;
 	ast_free(sc);
 	return 0;
 }
 
-static void device_state_cb(const struct ast_event *event, void *unused)
-{
-	enum ast_device_state state;
-	const char *device;
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
 	struct statechange *sc;
 	size_t datapsize;
 
-	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
-
-	if (ast_strlen_zero(device)) {
-		ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
+	if (stasis_device_state() != stasis_message_type(msg)) {
 		return;
 	}
-	datapsize = sizeof(*sc) + strlen(device) + 1;
+
+	datapsize = sizeof(*sc);
 	if (!(sc = ast_calloc(1, datapsize))) {
 		ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
 		return;
 	}
-	sc->state = state;
-	strcpy(sc->dev, device);
+
+	sc->dev_state = stasis_message_data(msg);
+	ao2_ref(sc->dev_state, +1);
 	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
 		ast_free(sc);
 	}
@@ -9873,8 +9876,9 @@
 
 	res |= ast_data_unregister(NULL);
 
-	if (device_state_sub)
-		ast_event_unsubscribe(device_state_sub);
+	if (device_state_sub) {
+		stasis_unsubscribe(device_state_sub);
+	}
 
 	ast_extension_state_del(0, extension_state_cb);
 
@@ -9950,7 +9954,7 @@
 	}
 
 	/* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */
-	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, "AppQueue Device state", NULL, AST_EVENT_IE_END))) {
+	if (!(device_state_sub = stasis_subscribe(stasis_device_state_topic_all(), device_state_cb, NULL))) {
 		res = -1;
 	}
 

Modified: team/kmoore/stasis-device_state/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/include/asterisk/devicestate.h?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/include/asterisk/devicestate.h (original)
+++ team/kmoore/stasis-device_state/include/asterisk/devicestate.h Wed Mar 13 08:41:27 2013
@@ -38,6 +38,7 @@
 #define _ASTERISK_DEVICESTATE_H
 
 #include "asterisk/channelstate.h"
+#include "asterisk/utils.h"
 
 #if defined(__cplusplus) || defined(c_plusplus)
 extern "C" {
@@ -270,19 +271,103 @@
 };
 
 /*!
- * \brief Enable distributed device state processing.
- *
- * \details
- * By default, Asterisk assumes that device state change events will only be
- * originating from one instance.  If a module gets loaded and configured such
- * that multiple instances of Asterisk will be sharing device state, this
- * function should be called to enable distributed device state processing.
- * It is off by default to save on unnecessary processing.
- *
- * \retval 0 success
- * \retval -1 failure
- */
-int ast_enable_distributed_devstate(void);
+ * \brief The structure that contains device state
+ * \since 12
+ */
+struct stasis_device_state {
+	AST_DECLARE_STRING_FIELDS(
+		AST_STRING_FIELD(uniqueid);	/*!< A unique ID used for hashing */
+		AST_STRING_FIELD(device);	/*!< The name of the device */
+	);
+	enum ast_device_state state;		/*!< The state of the device */
+	struct ast_eid eid;			/*!< The EID of the server where this message originated */
+	enum ast_devstate_cache cachable;	/*!< Flag designating the cachability of this device state */
+};
+
+/*!
+ * \brief Get the Stasis topic for cluster-wide device state messages.
+ * \retval The topic for device state messages with EID
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_device_state_cluster_topic_all(void);
+
+/*!
+ * \brief Get the Stasis caching topic for cluster-wide device state messages
+ * \retval The caching topic for MWI messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void);
+
+/*!
+ * \brief Get the Stasis topic for aggregated device state messages
+ * \retval The topic for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_device_state_topic_all(void);
+
+/*!
+ * \brief Get the Stasis topic for aggregated device state messages for a specific device
+ * \param uniqueid The device for which to get the topic
+ * \retval The topic structure for MWI messages for a given device
+ * \retval NULL if it failed to be found or allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_device_state_topic(const char *device);
+
+/*!
+ * \brief Get the Stasis caching topic for device state messages
+ * \retval The caching topic for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_device_state_topic_cached(void);
+
+/*!
+ * \brief Get the Stasis message type for device state messages
+ * \retval The message type for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_message_type *stasis_device_state(void);
+
+/*!
+ * \brief Initialize the device state core
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int devstate_init(void);
+
+/*!
+ * \brief Publish a device state update
+ * \param[in] device The device name
+ * \param[in] state The state of the device
+ * \param[in] cachable Whether the device state can be cached
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+#define stasis_publish_device_state(device, state, cachable) \
+	stasis_publish_device_state_full(device, state, cachable, NULL)
+
+/*!
+ * \brief Publish a device state update with EID
+ * \param[in] device The device name
+ * \param[in] state The state of the device
+ * \param[in] cachable Whether the device state can be cached
+ * \param[in] eid The EID of the server that originally published the message
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int stasis_publish_device_state_full(
+			const char *device,
+			enum ast_device_state state,
+			enum ast_devstate_cache cachable,
+			struct ast_eid *eid);
 
 #if defined(__cplusplus) || defined(c_plusplus)
 }

Modified: team/kmoore/stasis-device_state/include/asterisk/xmpp.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/include/asterisk/xmpp.h?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/include/asterisk/xmpp.h (original)
+++ team/kmoore/stasis-device_state/include/asterisk/xmpp.h Wed Mar 13 08:41:27 2013
@@ -47,6 +47,7 @@
 #include "asterisk/linkedlists.h"
 #include "asterisk/stringfields.h"
 #include "asterisk/pbx.h"
+#include "asterisk/stasis.h"
 
 /*
  * As per RFC 3920 - section 3.1, the maximum length for a full Jabber ID
@@ -135,7 +136,7 @@
 	int timeout;
 	unsigned int reconnect:1; /*!< Reconnect this client */
 	struct ast_event_sub *mwi_sub; /*!< If distributing event information the MWI subscription */
-	struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */
+	struct stasis_subscription *device_state_sub; /*!< If distributing event information the device state subscription */
 };
 
 /*!

Modified: team/kmoore/stasis-device_state/main/asterisk.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/asterisk.c?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/main/asterisk.c (original)
+++ team/kmoore/stasis-device_state/main/asterisk.c Wed Mar 13 08:41:27 2013
@@ -4126,6 +4126,11 @@
 		exit(1);
 	}
 
+	if (devstate_init()) {
+		printf("Device state core initialization failed.\n%s", term_quit());
+		exit(1);
+	}
+
 	ast_makesocket();
 	sigemptyset(&sigs);
 	sigaddset(&sigs, SIGHUP);

Modified: team/kmoore/stasis-device_state/main/ccss.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/ccss.c?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/main/ccss.c (original)
+++ team/kmoore/stasis-device_state/main/ccss.c Wed Mar 13 08:41:27 2013
@@ -1209,7 +1209,7 @@
 	 * recalled
 	 */
 	int fit_for_recall;
-	struct ast_event_sub *sub;
+	struct stasis_subscription *sub;
 	AST_LIST_HEAD_NOLOCK(, generic_monitor_instance) list;
 };
 
@@ -1260,14 +1260,15 @@
 	struct generic_monitor_instance_list *generic_list = obj;
 	struct generic_monitor_instance *generic_instance;
 
-	generic_list->sub = ast_event_unsubscribe(generic_list->sub);
+	stasis_unsubscribe(generic_list->sub);
+	generic_list->sub = NULL;
 	while ((generic_instance = AST_LIST_REMOVE_HEAD(&generic_list->list, next))) {
 		ast_free(generic_instance);
 	}
 	ast_free((char *)generic_list->device_name);
 }
 
-static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
 static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
 {
 	struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@@ -1285,11 +1286,7 @@
 	ast_tech_to_upper(device_name);
 	generic_list->device_name = device_name;
 
-	if (!(generic_list->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE,
-		generic_monitor_devstate_cb, "Requesting CC", NULL,
-		AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, monitor->interface->device_name,
-		AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS,
-		AST_EVENT_IE_END))) {
+	if (!(generic_list->sub = stasis_subscribe(stasis_device_state_topic(monitor->interface->device_name), generic_monitor_devstate_cb, NULL))) {
 		cc_unref(generic_list, "Failed to subscribe to device state");
 		return NULL;
 	}
@@ -1298,17 +1295,12 @@
 	return generic_list;
 }
 
-struct generic_tp_cb_data {
-	const char *device_name;
-	enum ast_device_state new_state;
-};
-
 static int generic_monitor_devstate_tp_cb(void *data)
 {
-	struct generic_tp_cb_data *gtcd = data;
-	enum ast_device_state new_state = gtcd->new_state;
-	enum ast_device_state previous_state = gtcd->new_state;
-	const char *monitor_name = gtcd->device_name;
+	struct stasis_device_state *gtcd = data;
+	enum ast_device_state new_state = gtcd->state;
+	enum ast_device_state previous_state = gtcd->state;
+	const char *monitor_name = gtcd->device;
 	struct generic_monitor_instance_list *generic_list;
 	struct generic_monitor_instance *generic_instance;
 
@@ -1317,16 +1309,14 @@
 		 * time between subscribing to its device state and the time this executes.
 		 * Not really a big deal.
 		 */
-		ast_free((char *) gtcd->device_name);
-		ast_free(gtcd);
+		ao2_cleanup(gtcd);
 		return 0;
 	}
 
 	if (generic_list->current_state == new_state) {
 		/* The device state hasn't actually changed, so we don't really care */
 		cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
-		ast_free((char *) gtcd->device_name);
-		ast_free(gtcd);
+		ao2_cleanup(gtcd);
 		return 0;
 	}
 
@@ -1346,33 +1336,27 @@
 		}
 	}
 	cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
-	ast_free((char *) gtcd->device_name);
-	ast_free(gtcd);
+	ao2_cleanup(gtcd);
 	return 0;
 }
 
-static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	/* Wow, it's cool that we've picked up on a state change, but we really want
 	 * the actual work to be done in the core's taskprocessor execution thread
 	 * so that all monitor operations can be serialized. Locks?! We don't need
 	 * no steenkin' locks!
 	 */
-	struct generic_tp_cb_data *gtcd = ast_calloc(1, sizeof(*gtcd));
-
-	if (!gtcd) {
+	struct stasis_device_state *gtcd;
+	if (stasis_device_state() != stasis_message_type(msg)) {
 		return;
 	}
 
-	if (!(gtcd->device_name = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)))) {
-		ast_free(gtcd);
-		return;
-	}
-	gtcd->new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
+	gtcd = stasis_message_data(msg);
+
+	ao2_ref(gtcd, +1);
 	if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) {
-		ast_free((char *)gtcd->device_name);
-		ast_free(gtcd);
+		ao2_cleanup(gtcd);
 	}
 }
 
@@ -2502,7 +2486,7 @@
 	 * device state of the caller in order to
 	 * determine when we may move on
 	 */
-	struct ast_event_sub *sub;
+	struct stasis_subscription *sub;
 	/*!
 	 * Scheduler id of offer timer.
 	 */
@@ -2635,34 +2619,28 @@
 	return 0;
 }
 
-static int generic_agent_devstate_unsubscribe(void *data)
-{
-	struct ast_cc_agent *agent = data;
-	struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
-
-	if (generic_pvt->sub != NULL) {
-		generic_pvt->sub = ast_event_unsubscribe(generic_pvt->sub);
-	}
-	cc_unref(agent, "Done unsubscribing from devstate");
-	return 0;
-}
-
-static void generic_agent_devstate_cb(const struct ast_event *event, void *userdata)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct ast_cc_agent *agent = userdata;
 	enum ast_device_state new_state;
-
-	new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+	struct stasis_device_state *dev_state = stasis_message_data(msg);
+	struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
+
+	if (stasis_subscription_final_message(sub, msg)) {
+		cc_unref(agent, "Done holding ref for subscription");
+		return;
+	} else if (stasis_device_state() != stasis_message_type(msg)) {
+		return;
+	}
+
+	new_state = dev_state->state;
 	if (!cc_generic_is_device_available(new_state)) {
 		/* Not interested in this new state of the device.  It is still busy. */
 		return;
 	}
 
-	/* We can't unsubscribe from device state events here because it causes a deadlock */
-	if (ast_taskprocessor_push(cc_core_taskprocessor, generic_agent_devstate_unsubscribe,
-			cc_ref(agent, "ref agent for device state unsubscription"))) {
-		cc_unref(agent, "Unref agent unsubscribing from devstate failed");
-	}
+	stasis_unsubscribe(sub);
+	generic_pvt->sub = NULL;
 	ast_cc_agent_caller_available(agent->core_id, "%s is no longer busy", agent->device_name);
 }
 
@@ -2675,13 +2653,10 @@
 	ast_str_set(&str, 0, "Agent monitoring %s device state since it is busy\n",
 		agent->device_name);
 
-	if (!(generic_pvt->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE,
-		generic_agent_devstate_cb, ast_str_buffer(str), agent,
-		AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, agent->device_name,
-		AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS,
-		AST_EVENT_IE_END))) {
-		return -1;
-	}
+	if (!(generic_pvt->sub = stasis_subscribe(stasis_device_state_topic(agent->device_name), generic_agent_devstate_cb, agent))) {
+		return -1;
+	}
+	cc_ref(agent, "Ref agent for subscription");
 	return 0;
 }
 
@@ -2792,7 +2767,8 @@
 
 	cc_generic_agent_stop_offer_timer(agent);
 	if (agent_pvt->sub) {
-		agent_pvt->sub = ast_event_unsubscribe(agent_pvt->sub);
+		stasis_unsubscribe(agent_pvt->sub);
+		agent_pvt->sub = NULL;
 	}
 
 	ast_free(agent_pvt);

Modified: team/kmoore/stasis-device_state/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/devicestate.c?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/main/devicestate.c (original)
+++ team/kmoore/stasis-device_state/main/devicestate.c Wed Mar 13 08:41:27 2013
@@ -129,7 +129,10 @@
 #include "asterisk/devicestate.h"
 #include "asterisk/pbx.h"
 #include "asterisk/app.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
 #include "asterisk/event.h"
+#include "asterisk/devicestate.h"
 
 /*! \brief Device state strings for printing */
 static const char * const devstatestring[][2] = {
@@ -190,23 +193,47 @@
 
 struct devstate_change {
 	AST_LIST_ENTRY(devstate_change) entry;
-	uint32_t state;
-	struct ast_eid eid;
-	enum ast_devstate_cache cachable;
-	char device[1];
+	struct stasis_device_state *dev_state;
 };
 
 static struct {
 	pthread_t thread;
-	struct ast_event_sub *event_sub;
+	struct stasis_subscription *event_sub;
 	ast_cond_t cond;
 	ast_mutex_t lock;
 	AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
-	unsigned int enabled:1;
 } devstate_collector = {
 	.thread = AST_PTHREADT_NULL,
-	.enabled = 0,
 };
+
+static struct stasis_topic *__device_state_cluster_topic_all;
+static struct stasis_caching_topic *__device_state_cluster_topic_cached;
+static struct stasis_topic *__device_state_topic_all;
+static struct stasis_caching_topic *__device_state_topic_cached;
+static struct stasis_message_type *__device_state_message_type;
+static struct ao2_container *__device_state_topics;
+
+struct device_state_topic {
+	char *uniqueid;
+	struct stasis_subscription *forward;
+	struct stasis_topic *topic;
+};
+
+static void device_state_topic_dtor(void *obj)
+{
+	struct device_state_topic *topic = obj;
+	ast_free(topic->uniqueid);
+	topic->uniqueid = NULL;
+	stasis_unsubscribe(topic->forward);
+	topic->forward = NULL;
+	ao2_cleanup(topic->topic);
+	topic->topic = NULL;
+}
+
+static struct device_state_topic *device_state_topic_alloc(void)
+{
+	return ao2_alloc(sizeof(struct device_state_topic), device_state_topic_dtor);
+}
 
 /* Forward declarations */
 static int getproviderstate(const char *provider, const char *address);
@@ -289,21 +316,16 @@
 
 static enum ast_device_state devstate_cached(const char *device)
 {
-	enum ast_device_state res = AST_DEVICE_UNKNOWN;
-	struct ast_event *event;
-
-	event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
-		AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
-		AST_EVENT_IE_END);
-
-	if (!event)
-		return res;
-
-	res = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
-	ast_event_destroy(event);
-
-	return res;
+	RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+	struct stasis_device_state *dev_state;
+
+	cached_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
+	if (!cached_msg) {
+		return AST_DEVICE_UNKNOWN;
+	}
+	dev_state = stasis_message_data(cached_msg);
+
+	return dev_state->state;
 }
 
 /*! \brief Check device state through channel specific function or generic function */
@@ -426,39 +448,9 @@
 	return res;
 }
 
-static void devstate_event(const char *device, enum ast_device_state state, int cachable)
-{
-	struct ast_event *event;
-	enum ast_event_type event_type;
-
-	if (devstate_collector.enabled) {
-		/* Distributed device state is enabled, so this state change is a change
-		 * for a single server, not the real state. */
-		event_type = AST_EVENT_DEVICE_STATE_CHANGE;
-	} else {
-		event_type = AST_EVENT_DEVICE_STATE;
-	}
-
-	ast_debug(3, "device '%s' state '%d'\n", device, state);
-
-	if (!(event = ast_event_new(event_type,
-				    AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
-				    AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
-				    AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, cachable,
-				    AST_EVENT_IE_END))) {
-		return;
-	}
-
-	if (cachable) {
-		ast_event_queue_and_cache(event);
-	} else {
-		ast_event_queue(event);
-	}
-}
-
 /*! Called by the state change thread to find out what the state is, and then
  *  to queue up the state change event */
-static void do_state_change(const char *device, int cachable)
+static void do_state_change(const char *device, enum ast_devstate_cache cachable)
 {
 	enum ast_device_state state;
 
@@ -466,7 +458,7 @@
 
 	ast_debug(3, "Changing state for %s - state %d (%s)\n", device, state, ast_devstate2str(state));
 
-	devstate_event(device, state, cachable);
+	stasis_publish_device_state(device, state, cachable);
 }
 
 int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_cache cachable, const char *device)
@@ -490,7 +482,7 @@
 	 */
 
 	if (state != AST_DEVICE_UNKNOWN) {
-		devstate_event(device, state, cachable);
+		stasis_publish_device_state(device, state, cachable);
 	} else if (change_thread == AST_PTHREADT_NULL || !(change = ast_calloc(1, sizeof(*change) + strlen(device)))) {
 		/* we could not allocate a change struct, or */
 		/* there is no background thread, so process the change now */
@@ -564,6 +556,8 @@
 
 static void destroy_devstate_change(struct devstate_change *sc)
 {
+	ao2_cleanup(sc->dev_state);
+	sc->dev_state = NULL;
 	ast_free(sc);
 }
 
@@ -573,115 +567,98 @@
 	size_t num_states;
 };
 
-static void devstate_cache_cb(const struct ast_event *event, void *data)
-{
-	struct change_collection *collection = data;
-	int i;
-	const struct ast_eid *eid;
-
-	if (collection->num_states == ARRAY_LEN(collection->states)) {
-		ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n",
-			MAX_SERVERS);
-		return;
-	}
-
-	if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
-		ast_log(LOG_ERROR, "Device state change event with no EID\n");
-		return;
-	}
-
-	i = collection->num_states;
-
-	collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-	collection->states[i].eid = *eid;
-
-	collection->num_states++;
-}
-
-static void process_collection(const char *device, enum ast_devstate_cache cachable, struct change_collection *collection)
-{
-	int i;
+static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
+{
+	struct stasis_message *msg = obj;
+	struct ast_devstate_aggregate *agg = arg;
+	char *device = data;
+	struct stasis_device_state *dev_state = stasis_message_data(msg);
+
+	if (strcmp(device, dev_state->device)) {
+		return 0;
+	}
+	ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
+		ast_devstate2str(dev_state->state), device);
+	ast_devstate_aggregate_add(agg, dev_state->state);
+	return 0;
+}
+
+static void device_state_dtor(void *obj)
+{
+	struct stasis_device_state *device_state = obj;
+	ast_string_field_free_memory(device_state);
+}
+
+static struct stasis_device_state *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cached, const struct ast_eid *eid)
+{
+	RAII_VAR(struct stasis_device_state *, new_dev_state, NULL, ao2_cleanup);
+	new_dev_state = ao2_alloc(sizeof(*new_dev_state), device_state_dtor);
+
+	if (!new_dev_state || ast_string_field_init(new_dev_state, 256)) {
+		return NULL;
+	}
+
+	ast_string_field_set(new_dev_state, device, device);
+	new_dev_state->state = state;
+	if (eid) {
+		new_dev_state->eid = *eid;
+	} else {
+		ast_set_default_eid(&new_dev_state->eid);
+	}
+
+	ao2_ref(new_dev_state, +1);
+	return new_dev_state;
+}
+
+static void handle_devstate_change(struct devstate_change *sc)
+{
+	RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
 	struct ast_devstate_aggregate agg;
-	enum ast_device_state state;
-	struct ast_event *event;
+	enum ast_device_state agg_state;
+	char *device = (char *)sc->dev_state->device;
+	RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, new_agg_msg, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_device_state *, new_agg_state, NULL, ao2_cleanup);
 
 	ast_devstate_aggregate_init(&agg);
 
-	for (i = 0; i < collection->num_states; i++) {
-		ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
-			ast_devstate2str(collection->states[i].state), device);
-		ast_devstate_aggregate_add(&agg, collection->states[i].state);
-	}
-
-	state = ast_devstate_aggregate_result(&agg);
+	ast_debug(1, "Processing device state change for '%s'\n", device);
+
+	cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+
+	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
+
+	agg_state = ast_devstate_aggregate_result(&agg);
+
+	cached_agg_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
 
 	ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
-		ast_devstate2str(state), device);
-
-	event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
-		AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
-		AST_EVENT_IE_END);
-
-	if (event) {
-		enum ast_device_state old_state;
-
-		old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
-		ast_event_destroy(event);
-
-		if (state == old_state) {
+		ast_devstate2str(agg_state), device);
+
+	if (cached_agg_msg) {
+		struct stasis_device_state *cached_agg_dev_state = stasis_message_data(cached_agg_msg);
+		if (cached_agg_dev_state->state == agg_state) {
 			/* No change since last reported device state */
 			ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
-				device, ast_devstate2str(state));
+				sc->dev_state->device, ast_devstate2str(agg_state));
 			return;
 		}
 	}
 
+	new_agg_state = device_state_alloc(device,
+					sc->dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE ? sc->dev_state->state : agg_state,
+					sc->dev_state->cachable,
+					NULL);
+
 	ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
-		device, ast_devstate2str(state));
-
-	event = ast_event_new(AST_EVENT_DEVICE_STATE,
-		AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
-		AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
-		AST_EVENT_IE_END);
-
-	if (!event) {
-		return;
-	}
-
-	if (cachable) {
-		ast_event_queue_and_cache(event);
-	} else {
-		ast_event_queue(event);
-	}
-}
-
-static void handle_devstate_change(struct devstate_change *sc)
-{
-	struct ast_event_sub *tmp_sub;
-	struct change_collection collection = {
-		.num_states = 0,
-	};
-
-	ast_debug(1, "Processing device state change for '%s'\n", sc->device);
-
-	if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) {
-		ast_log(LOG_ERROR, "Failed to create subscription\n");
-		return;
-	}
-
-	if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) {
-		ast_log(LOG_ERROR, "Failed to append device IE\n");
-		ast_event_sub_destroy(tmp_sub);
-		return;
-	}
-
-	/* Populate the collection of device states from the cache */
-	ast_event_dump_cache(tmp_sub);
-
-	process_collection(sc->device, sc->cachable, &collection);
-
-	ast_event_sub_destroy(tmp_sub);
+		device, ast_devstate2str(new_agg_state->state));
+
+	ast_string_field_set(new_agg_state, uniqueid, device);
+
+	new_agg_msg = stasis_message_create(stasis_device_state(), new_agg_state);
+
+	ast_assert(stasis_device_state_topic(device) != NULL);
+	stasis_publish(stasis_device_state_topic(device), new_agg_msg);
 }
 
 static void *run_devstate_collector(void *data)
@@ -702,31 +679,20 @@
 	return NULL;
 }
 
-static void devstate_change_collector_cb(const struct ast_event *event, void *data)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
 	struct devstate_change *sc;
-	const char *device;
-	const struct ast_eid *eid;
-	uint32_t state;
-	enum ast_devstate_cache cachable = AST_DEVSTATE_CACHABLE;
-
-	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
-	eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
-	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-	cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
-
-	if (ast_strlen_zero(device) || !eid) {
-		ast_log(LOG_ERROR, "Invalid device state change event received\n");
+
+	if (stasis_device_state() != stasis_message_type(msg)) {
 		return;
 	}
 
-	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device))))
+	if (!(sc = ast_calloc(1, sizeof(*sc)))) {
 		return;
-
-	strcpy(sc->device, device);
-	sc->eid = *eid;
-	sc->state = state;
-	sc->cachable = cachable;
+	}
+
+	sc->dev_state = stasis_message_data(msg);
+	ao2_ref(sc->dev_state, +1);
 
 	ast_mutex_lock(&devstate_collector.lock);
 	AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
@@ -784,14 +750,170 @@
 	return agg->state;
 }
 
-int ast_enable_distributed_devstate(void)
-{
-	if (devstate_collector.enabled) {
-		return 0;
-	}
-
-	devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
-		devstate_change_collector_cb, "devicestate_engine_enable_distributed", NULL, AST_EVENT_IE_END);
+struct stasis_topic *stasis_device_state_cluster_topic_all(void)
+{
+	return __device_state_cluster_topic_all;
+}
+
+struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void)
+{
+	return __device_state_cluster_topic_cached;
+}
+
+struct stasis_topic *stasis_device_state_topic_all(void)
+{
+	return __device_state_topic_all;
+}
+
+struct stasis_caching_topic *stasis_device_state_topic_cached(void)
+{
+	return __device_state_topic_cached;
+}
+
+struct stasis_message_type *stasis_device_state(void)
+{
+	return __device_state_message_type;
+}
+
+struct stasis_topic *stasis_device_state_topic(const char *device)
+{
+	RAII_VAR(struct device_state_topic *, device_state_topic, ao2_find(__device_state_topics, device, OBJ_KEY), ao2_cleanup);
+
+	if (device_state_topic) {
+		return device_state_topic->topic;
+	}
+
+	device_state_topic = device_state_topic_alloc();
+
+	if (!device_state_topic) {
+		return NULL;
+	}
+
+	device_state_topic->topic = stasis_topic_create(device);
+	if (!device_state_topic->topic) {
+		return NULL;
+	}
+
+	device_state_topic->forward = stasis_forward_all(device_state_topic->topic, stasis_device_state_topic_all());
+	if (!device_state_topic->forward) {
+		return NULL;
+	}
+
+	device_state_topic->uniqueid = ast_strdup(device);
+	if (!device_state_topic->uniqueid) {
+		return NULL;
+	}
+
+	ao2_link(__device_state_topics, device_state_topic);
+
+	return device_state_topic->topic;
+}
+
+int stasis_publish_device_state_full(
+			const char *device,
+			enum ast_device_state state,
+			enum ast_devstate_cache cachable,
+			struct ast_eid *eid)
+{
+	RAII_VAR(struct stasis_device_state *, device_state, NULL, ao2_cleanup);
+	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+	struct ast_str *uniqueid = ast_str_alloca(256);
+	char eid_str[20];
+
+	ast_assert(!ast_strlen_zero(device));
+
+	device_state = device_state_alloc(device, state, cachable, eid);
+	if (!device_state) {
+		return -1;
+	}
+	ast_eid_to_str(eid_str, sizeof(eid_str), &device_state->eid);
+
+	ast_str_set(&uniqueid, 0, "%s%s", eid_str, device);
+	ast_string_field_set(device_state, uniqueid, ast_str_buffer(uniqueid));
+
+	message = stasis_message_create(stasis_device_state(), device_state);
+
+	ast_assert(stasis_device_state_topic(ast_str_buffer(uniqueid)) != NULL);
+	stasis_publish(stasis_device_state_cluster_topic_all(), message);
+
+	return 0;
+}
+
+static const char *device_state_get_id(struct stasis_message *message)
+{
+	struct stasis_device_state *device_state;
+	if (stasis_device_state() != stasis_message_type(message)) {
+		return NULL;
+	}
+
+	device_state = stasis_message_data(message);
+	if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
+		return NULL;
+	}
+
+	return device_state->uniqueid;
+}
+
+static int device_state_topic_hash(const void *obj, const int flags)
+{
+	const char *uniqueid = (flags & OBJ_KEY) ? obj : ((struct device_state_topic*) obj)->uniqueid;
+	return ast_str_case_hash(uniqueid);
+}
+
+static int device_state_topic_cmp(void *obj, void *arg, int flags)
+{
+	struct device_state_topic *opt1 = obj, *opt2 = arg;
+	const char *uniqueid = (flags & OBJ_KEY) ? arg : opt2->uniqueid;
+	return strcasecmp(opt1->uniqueid, uniqueid) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+static void devstate_exit(void)
+{
+	ao2_cleanup(__device_state_cluster_topic_all);
+	__device_state_cluster_topic_all = NULL;
+	stasis_caching_unsubscribe(__device_state_cluster_topic_cached);
+	__device_state_cluster_topic_cached = NULL;
+	ao2_cleanup(__device_state_topic_all);
+	__device_state_topic_all = NULL;
+	stasis_caching_unsubscribe(__device_state_topic_cached);
+	__device_state_topic_cached = NULL;
+	ao2_cleanup(__device_state_message_type);
+	__device_state_message_type = NULL;
+	ao2_cleanup(__device_state_topics);
+	__device_state_topics = NULL;
+}
+
+#define DEVSTATE_TOPIC_BUCKETS 57
+
+int devstate_init(void)
+{
+	ast_register_atexit(devstate_exit);
+	__device_state_cluster_topic_all = stasis_topic_create("stasis_device_state_cluster_topic");
+	if (!__device_state_cluster_topic_all) {
+		return -1;
+	}
+	__device_state_cluster_topic_cached = stasis_caching_topic_create(__device_state_cluster_topic_all, device_state_get_id);
+	if (!__device_state_cluster_topic_cached) {
+		return -1;
+	}
+	__device_state_topic_all = stasis_topic_create("stasis_device_state_topic");
+	if (!__device_state_topic_all) {
+		return -1;
+	}
+	__device_state_topic_cached = stasis_caching_topic_create(__device_state_topic_all, device_state_get_id);
+	if (!__device_state_topic_cached) {
+		return -1;
+	}
+	__device_state_message_type = stasis_message_type_create("stasis_device_state");
+	if (!__device_state_message_type) {
+		return -1;
+	}
+	__device_state_topics = ao2_container_alloc(DEVSTATE_TOPIC_BUCKETS, device_state_topic_hash, device_state_topic_cmp);
+	if (!__device_state_topics) {
+		return -1;
+	}
+
+	devstate_collector.event_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
 
 	if (!devstate_collector.event_sub) {
 		ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
@@ -805,7 +927,5 @@
 		return -1;
 	}
 
-	devstate_collector.enabled = 1;
-
 	return 0;
 }

Modified: team/kmoore/stasis-device_state/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/pbx.c?view=diff&rev=383007&r1=383006&r2=383007
==============================================================================
--- team/kmoore/stasis-device_state/main/pbx.c (original)
+++ team/kmoore/stasis-device_state/main/pbx.c Wed Mar 13 08:41:27 2013
@@ -1131,7 +1131,7 @@
 
 struct statechange {
 	AST_LIST_ENTRY(statechange) entry;
-	char dev[0];
+	struct stasis_device_state *dev_state;
 };
 
 struct pbx_exception {
@@ -1299,7 +1299,7 @@
 static char *overrideswitch = NULL;
 
 /*! \brief Subscription for device state change events */
-static struct ast_event_sub *device_state_sub;
+static struct stasis_subscription *device_state_sub;
 /*! \brief Subscription for presence state change events */
 static struct ast_event_sub *presence_state_sub;
 
@@ -5269,18 +5269,20 @@
 
 	if (ao2_container_count(hintdevices) == 0) {
 		/* There are no hints monitoring devices. */
+		ao2_cleanup(sc->dev_state);
 		ast_free(sc);
 		return 0;
 	}
 
 	hint_app = ast_str_create(1024);
 	if (!hint_app) {
+		ao2_cleanup(sc->dev_state);
 		ast_free(sc);
 		return -1;
 	}
 
-	cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev));
-	strcpy(cmpdevice->hintdevice, sc->dev);
+	cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev_state->device));
+	strcpy(cmpdevice->hintdevice, sc->dev_state->device);
 
 	ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */
 	dev_iter = ao2_t_callback(hintdevices,
@@ -5291,6 +5293,7 @@
 	if (!dev_iter) {
 		ast_mutex_unlock(&context_merge_lock);
 		ast_free(hint_app);
+		ao2_cleanup(sc->dev_state);
 		ast_free(sc);
 		return -1;
 	}
@@ -5389,6 +5392,7 @@
 
 	ao2_iterator_destroy(dev_iter);
 	ast_free(hint_app);
+	ao2_cleanup(sc->dev_state);
 	ast_free(sc);
 	return 0;
 }
@@ -11715,21 +11719,22 @@
 	}
 }
 
-static void device_state_cb(const struct ast_event *event, void *unused)
-{
-	const char *device;
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
 	struct statechange *sc;
 
-	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
-	if (ast_strlen_zero(device)) {
-		ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
+	if (stasis_device_state() != stasis_message_type(msg)) {
 		return;
 	}
 
-	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
+	if (!(sc = ast_calloc(1, sizeof(*sc)))) {
 		return;
-	strcpy(sc->dev, device);
+	}
+
+	sc->dev_state = stasis_message_data(msg);
+	ao2_ref(sc->dev_state, +1);
 	if (ast_taskprocessor_push(extension_state_tps, handle_statechange, sc) < 0) {
+		ao2_cleanup(sc->dev_state);
 		ast_free(sc);
 	}
 }
@@ -11794,7 +11799,8 @@
 		presence_state_sub = ast_event_unsubscribe(presence_state_sub);
 	}
 	if (device_state_sub) {
-		device_state_sub = ast_event_unsubscribe(device_state_sub);
+		stasis_unsubscribe(device_state_sub);
+		device_state_sub = NULL;
 	}
 
 	/* Unregister builtin applications */
@@ -11841,8 +11847,7 @@
 	/* Register manager application */
 	ast_manager_register_xml_core("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan);
 

[... 247 lines stripped ...]



More information about the svn-commits mailing list