[asterisk-commits] kmoore: branch kmoore/stasis-device_state r383029 - in /team/kmoore/stasis-de...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 13 10:47:50 CDT 2013


Author: kmoore
Date: Wed Mar 13 10:47:46 2013
New Revision: 383029

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383029
Log:
Rip out now-redundant task processors

Several of the task processors used by device state callbacks are no
longer necessary since Stasis guarantees serialized invocations of
callbacks and uses a thread pool to service callbacks. This includes
usage of the extension state taskprocessor by device state code,
complete removal of the devstate_collector pseudo-taskprocessor, and
complete removal of app_queue's device state task processor.

Modified:
    team/kmoore/stasis-device_state/apps/app_queue.c
    team/kmoore/stasis-device_state/main/devicestate.c
    team/kmoore/stasis-device_state/main/pbx.c

Modified: team/kmoore/stasis-device_state/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/apps/app_queue.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/apps/app_queue.c (original)
+++ team/kmoore/stasis-device_state/apps/app_queue.c Wed Mar 13 10:47:46 2013
@@ -990,9 +990,6 @@
 	{ QUEUE_AUTOPAUSE_ALL,"all" },
 };
 
-
-static struct ast_taskprocessor *devicestate_tps;
-
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
 #define RECHECK			1		/*!< Recheck every second to see we we're at the top yet */
@@ -1618,11 +1615,6 @@
 	return -1;
 }
 
-struct statechange {
-	AST_LIST_ENTRY(statechange) entry;
-	struct stasis_device_state *dev_state;
-};
-
 /*! \brief set a member's status based on device state of that member's state_interface.
  *
  * Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -1741,10 +1733,10 @@
 }
 
 /*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
-	struct statechange *sc = datap;
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
 	struct ao2_iterator miter, qiter;
+	struct stasis_device_state *dev_state;
 	struct member *m;
 	struct call_queue *q;
 	char interface[80], *slash_pos;
@@ -1752,6 +1744,12 @@
 	int found_member;		/* Found this member in this queue */
 	int avail = 0;			/* Found an available member in this queue */
 
+	if (stasis_device_state() != stasis_message_type(msg)) {
+		return;
+	}
+
+	dev_state = stasis_message_data(msg);
+
 	qiter = ao2_iterator_init(queues, 0);
 	while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
 		ao2_lock(q);
@@ -1769,9 +1767,9 @@
 					}
 				}
 
-				if (!strcasecmp(interface, sc->dev_state->device)) {
+				if (!strcasecmp(interface, dev_state->device)) {
 					found_member = 1;
-					update_status(q, m, sc->dev_state->state);
+					update_status(q, m, dev_state->state);
 				}
 			}
 
@@ -1804,42 +1802,17 @@
 
 	if (found) {
 		ast_debug(1, "Device '%s' changed to state '%d' (%s)\n",
-			sc->dev_state->device,
-			sc->dev_state->state,
-			ast_devstate2str(sc->dev_state->state));
+			dev_state->device,
+			dev_state->state,
+			ast_devstate2str(dev_state->state));
 	} else {
 		ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
-			sc->dev_state->device,
-			sc->dev_state->state,
-			ast_devstate2str(sc->dev_state->state));
-	}
-
-	ao2_cleanup(sc->dev_state);
-	sc->dev_state = NULL;
-	ast_free(sc);
-	return 0;
-}
-
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
-	struct statechange *sc;
-	size_t datapsize;
-
-	if (stasis_device_state() != stasis_message_type(msg)) {
-		return;
-	}
-
-	datapsize = sizeof(*sc);
-	if (!(sc = ast_calloc(1, datapsize))) {
-		ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
-		return;
-	}
-
-	sc->dev_state = stasis_message_data(msg);
-	ao2_ref(sc->dev_state, +1);
-	if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
-		ast_free(sc);
-	}
+			dev_state->device,
+			dev_state->state,
+			ast_devstate2str(dev_state->state));
+	}
+
+	return;
 }
 
 /*! \brief Helper function which converts from extension state to device state values */
@@ -9888,7 +9861,6 @@
 		queue_t_unref(q, "Done with iterator");
 	}
 	ao2_iterator_destroy(&q_iter);
-	devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
 	ao2_ref(queues, -1);
 	ast_unload_realtime("queue_members");
 	return res;
@@ -9949,10 +9921,6 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
-		ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
-	}
-
 	/* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */
 	if (!(device_state_sub = stasis_subscribe(stasis_device_state_topic_all(), device_state_cb, NULL))) {
 		res = -1;

Modified: team/kmoore/stasis-device_state/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/devicestate.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/main/devicestate.c (original)
+++ team/kmoore/stasis-device_state/main/devicestate.c Wed Mar 13 10:47:46 2013
@@ -191,20 +191,7 @@
 /*! \brief Flag for the queue */
 static ast_cond_t change_pending;
 
-struct devstate_change {
-	AST_LIST_ENTRY(devstate_change) entry;
-	struct stasis_device_state *dev_state;
-};
-
-static struct {
-	pthread_t thread;
-	struct stasis_subscription *event_sub;
-	ast_cond_t cond;
-	ast_mutex_t lock;
-	AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
-} devstate_collector = {
-	.thread = AST_PTHREADT_NULL,
-};
+struct stasis_subscription *devstate_message_sub;
 
 static struct stasis_topic *__device_state_cluster_topic_all;
 static struct stasis_caching_topic *__device_state_cluster_topic_cached;
@@ -554,19 +541,7 @@
 	return NULL;
 }
 
-static void destroy_devstate_change(struct devstate_change *sc)
-{
-	ao2_cleanup(sc->dev_state);
-	sc->dev_state = NULL;
-	ast_free(sc);
-}
-
 #define MAX_SERVERS 64
-struct change_collection {
-	struct devstate_change states[MAX_SERVERS];
-	size_t num_states;
-};
-
 static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
 {
 	struct stasis_message *msg = obj;
@@ -610,45 +585,63 @@
 	return new_dev_state;
 }
 
-static void handle_devstate_change(struct devstate_change *sc)
+static enum ast_device_state get_agg_state(char *device)
 {
 	RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
 	struct ast_devstate_aggregate agg;
+
+	ast_devstate_aggregate_init(&agg);
+
+	cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+
+	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
+
+	return ast_devstate_aggregate_result(&agg);
+}
+
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
 	enum ast_device_state agg_state;
-	char *device = (char *)sc->dev_state->device;
-	RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
+	char *device;
+	struct stasis_device_state *dev_state;
 	RAII_VAR(struct stasis_message *, new_agg_msg, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_device_state *, new_agg_state, NULL, ao2_cleanup);
 
-	ast_devstate_aggregate_init(&agg);
+	if (stasis_device_state() != stasis_message_type(msg)) {
+		return;
+	}
+
+	dev_state = stasis_message_data(msg);
+
+	device = (char *)dev_state->device;
 
 	ast_debug(1, "Processing device state change for '%s'\n", device);
 
-	cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
-
-	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
-
-	agg_state = ast_devstate_aggregate_result(&agg);
-
-	cached_agg_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
-
-	ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
-		ast_devstate2str(agg_state), device);
-
-	if (cached_agg_msg) {
-		struct stasis_device_state *cached_agg_dev_state = stasis_message_data(cached_agg_msg);
-		if (cached_agg_dev_state->state == agg_state) {
-			/* No change since last reported device state */
-			ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
-				sc->dev_state->device, ast_devstate2str(agg_state));
-			return;
+	if (dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
+		/* if it's not cachable, there will be no aggregate state to get
+		 * and this should be passed through */
+		agg_state = dev_state->state;
+	} else {
+		RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
+
+		agg_state = get_agg_state(device);
+
+		ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
+			ast_devstate2str(agg_state), device);
+	
+		cached_agg_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
+		if (cached_agg_msg) {
+			struct stasis_device_state *cached_agg_dev_state = stasis_message_data(cached_agg_msg);
+			if (cached_agg_dev_state->state == agg_state) {
+				/* No change since last reported device state */
+				ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
+					device, ast_devstate2str(agg_state));
+				return;
+			}
 		}
 	}
 
-	new_agg_state = device_state_alloc(device,
-					sc->dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE ? sc->dev_state->state : agg_state,
-					sc->dev_state->cachable,
-					NULL);
+	new_agg_state = device_state_alloc(device, agg_state, dev_state->cachable, NULL);
 
 	ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
 		device, ast_devstate2str(new_agg_state->state));
@@ -659,45 +652,6 @@
 
 	ast_assert(stasis_device_state_topic(device) != NULL);
 	stasis_publish(stasis_device_state_topic(device), new_agg_msg);
-}
-
-static void *run_devstate_collector(void *data)
-{
-	for (;;) {
-		struct devstate_change *sc;
-
-		ast_mutex_lock(&devstate_collector.lock);
-		while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry)))
-			ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock);
-		ast_mutex_unlock(&devstate_collector.lock);
-
-		handle_devstate_change(sc);
-
-		destroy_devstate_change(sc);
-	}
-
-	return NULL;
-}
-
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
-	struct devstate_change *sc;
-
-	if (stasis_device_state() != stasis_message_type(msg)) {
-		return;
-	}
-
-	if (!(sc = ast_calloc(1, sizeof(*sc)))) {
-		return;
-	}
-
-	sc->dev_state = stasis_message_data(msg);
-	ao2_ref(sc->dev_state, +1);
-
-	ast_mutex_lock(&devstate_collector.lock);
-	AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
-	ast_cond_signal(&devstate_collector.cond);
-	ast_mutex_unlock(&devstate_collector.lock);
 }
 
 /*! \brief Initialize the device state engine in separate thread */
@@ -913,19 +867,12 @@
 		return -1;
 	}
 
-	devstate_collector.event_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
-
-	if (!devstate_collector.event_sub) {
+	devstate_message_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
+
+	if (!devstate_message_sub) {
 		ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
 		return -1;
 	}
 
-	ast_mutex_init(&devstate_collector.lock);
-	ast_cond_init(&devstate_collector.cond, NULL);
-	if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) {
-		ast_log(LOG_ERROR, "Unable to start device state collector thread.\n");
-		return -1;
-	}
-
 	return 0;
 }

Modified: team/kmoore/stasis-device_state/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/pbx.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/main/pbx.c (original)
+++ team/kmoore/stasis-device_state/main/pbx.c Wed Mar 13 10:47:46 2013
@@ -1129,11 +1129,6 @@
 	char *message;
 };
 
-struct statechange {
-	AST_LIST_ENTRY(statechange) entry;
-	struct stasis_device_state *dev_state;
-};
-
 struct pbx_exception {
 	AST_DECLARE_STRING_FIELDS(
 		AST_STRING_FIELD(context);	/*!< Context associated with this exception */
@@ -5255,34 +5250,36 @@
 	ao2_iterator_destroy(&iter);
 }
 
-static int handle_statechange(void *datap)
-{
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
+	struct stasis_device_state *dev_state;
 	struct ast_hint *hint;
 	struct ast_str *hint_app;
 	struct ast_hintdevice *device;
 	struct ast_hintdevice *cmpdevice;
-	struct statechange *sc = datap;
 	struct ao2_iterator *dev_iter;
 	struct ao2_iterator cb_iter;
 	char context_name[AST_MAX_CONTEXT];
 	char exten_name[AST_MAX_EXTENSION];
 
+	if (stasis_device_state() != stasis_message_type(msg)) {
+		return;
+	}
+
+	dev_state = stasis_message_data(msg);
+
 	if (ao2_container_count(hintdevices) == 0) {
 		/* There are no hints monitoring devices. */
-		ao2_cleanup(sc->dev_state);
-		ast_free(sc);
-		return 0;
+		return;
 	}
 
 	hint_app = ast_str_create(1024);
 	if (!hint_app) {
-		ao2_cleanup(sc->dev_state);
-		ast_free(sc);
-		return -1;
-	}
-
-	cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev_state->device));
-	strcpy(cmpdevice->hintdevice, sc->dev_state->device);
+		return;
+	}
+
+	cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(dev_state->device));
+	strcpy(cmpdevice->hintdevice, dev_state->device);
 
 	ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */
 	dev_iter = ao2_t_callback(hintdevices,
@@ -5293,9 +5290,7 @@
 	if (!dev_iter) {
 		ast_mutex_unlock(&context_merge_lock);
 		ast_free(hint_app);
-		ao2_cleanup(sc->dev_state);
-		ast_free(sc);
-		return -1;
+		return;
 	}
 
 	for (; (device = ao2_iterator_next(dev_iter)); ao2_t_ref(device, -1, "Next device")) {
@@ -5392,9 +5387,7 @@
 
 	ao2_iterator_destroy(dev_iter);
 	ast_free(hint_app);
-	ao2_cleanup(sc->dev_state);
-	ast_free(sc);
-	return 0;
+	return;
 }
 
 /*!
@@ -8858,7 +8851,7 @@
 
 	/*
 	 * Notify watchers of all removed hints with the same lock
-	 * environment as handle_statechange().
+	 * environment as device_state_cb().
 	 */
 	while ((saved_hint = AST_LIST_REMOVE_HEAD(&hints_removed, list))) {
 		/* this hint has been removed, notify the watchers */
@@ -11719,26 +11712,6 @@
 	}
 }
 
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
-	struct statechange *sc;
-
-	if (stasis_device_state() != stasis_message_type(msg)) {
-		return;
-	}
-
-	if (!(sc = ast_calloc(1, sizeof(*sc)))) {
-		return;
-	}
-
-	sc->dev_state = stasis_message_data(msg);
-	ao2_ref(sc->dev_state, +1);
-	if (ast_taskprocessor_push(extension_state_tps, handle_statechange, sc) < 0) {
-		ao2_cleanup(sc->dev_state);
-		ast_free(sc);
-	}
-}
-
 /*!
  * \internal
  * \brief Implements the hints data provider.




More information about the asterisk-commits mailing list