[asterisk-commits] kmoore: branch kmoore/stasis-device_state r383029 - in /team/kmoore/stasis-de...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Mar 13 10:47:50 CDT 2013
Author: kmoore
Date: Wed Mar 13 10:47:46 2013
New Revision: 383029
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383029
Log:
Rip out now-redundant task processors
Several of the task processors used by device state callbacks are no
longer necessary since Stasis guarantees serialized invocations of
callbacks and uses a thread pool to service callbacks. This includes
usage of the extension state taskprocessor by device state code,
complete removal of the devstate_collector pseudo-taskprocessor, and
complete removal of app_queue's device state task processor.
Modified:
team/kmoore/stasis-device_state/apps/app_queue.c
team/kmoore/stasis-device_state/main/devicestate.c
team/kmoore/stasis-device_state/main/pbx.c
Modified: team/kmoore/stasis-device_state/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/apps/app_queue.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/apps/app_queue.c (original)
+++ team/kmoore/stasis-device_state/apps/app_queue.c Wed Mar 13 10:47:46 2013
@@ -990,9 +990,6 @@
{ QUEUE_AUTOPAUSE_ALL,"all" },
};
-
-static struct ast_taskprocessor *devicestate_tps;
-
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
#define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */
@@ -1618,11 +1615,6 @@
return -1;
}
-struct statechange {
- AST_LIST_ENTRY(statechange) entry;
- struct stasis_device_state *dev_state;
-};
-
/*! \brief set a member's status based on device state of that member's state_interface.
*
* Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -1741,10 +1733,10 @@
}
/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
- struct statechange *sc = datap;
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
struct ao2_iterator miter, qiter;
+ struct stasis_device_state *dev_state;
struct member *m;
struct call_queue *q;
char interface[80], *slash_pos;
@@ -1752,6 +1744,12 @@
int found_member; /* Found this member in this queue */
int avail = 0; /* Found an available member in this queue */
+ if (stasis_device_state() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+
qiter = ao2_iterator_init(queues, 0);
while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
ao2_lock(q);
@@ -1769,9 +1767,9 @@
}
}
- if (!strcasecmp(interface, sc->dev_state->device)) {
+ if (!strcasecmp(interface, dev_state->device)) {
found_member = 1;
- update_status(q, m, sc->dev_state->state);
+ update_status(q, m, dev_state->state);
}
}
@@ -1804,42 +1802,17 @@
if (found) {
ast_debug(1, "Device '%s' changed to state '%d' (%s)\n",
- sc->dev_state->device,
- sc->dev_state->state,
- ast_devstate2str(sc->dev_state->state));
+ dev_state->device,
+ dev_state->state,
+ ast_devstate2str(dev_state->state));
} else {
ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
- sc->dev_state->device,
- sc->dev_state->state,
- ast_devstate2str(sc->dev_state->state));
- }
-
- ao2_cleanup(sc->dev_state);
- sc->dev_state = NULL;
- ast_free(sc);
- return 0;
-}
-
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
- struct statechange *sc;
- size_t datapsize;
-
- if (stasis_device_state() != stasis_message_type(msg)) {
- return;
- }
-
- datapsize = sizeof(*sc);
- if (!(sc = ast_calloc(1, datapsize))) {
- ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
- return;
- }
-
- sc->dev_state = stasis_message_data(msg);
- ao2_ref(sc->dev_state, +1);
- if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
- ast_free(sc);
- }
+ dev_state->device,
+ dev_state->state,
+ ast_devstate2str(dev_state->state));
+ }
+
+ return;
}
/*! \brief Helper function which converts from extension state to device state values */
@@ -9888,7 +9861,6 @@
queue_t_unref(q, "Done with iterator");
}
ao2_iterator_destroy(&q_iter);
- devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
ao2_ref(queues, -1);
ast_unload_realtime("queue_members");
return res;
@@ -9949,10 +9921,6 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
- ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
- }
-
/* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */
if (!(device_state_sub = stasis_subscribe(stasis_device_state_topic_all(), device_state_cb, NULL))) {
res = -1;
Modified: team/kmoore/stasis-device_state/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/devicestate.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/main/devicestate.c (original)
+++ team/kmoore/stasis-device_state/main/devicestate.c Wed Mar 13 10:47:46 2013
@@ -191,20 +191,7 @@
/*! \brief Flag for the queue */
static ast_cond_t change_pending;
-struct devstate_change {
- AST_LIST_ENTRY(devstate_change) entry;
- struct stasis_device_state *dev_state;
-};
-
-static struct {
- pthread_t thread;
- struct stasis_subscription *event_sub;
- ast_cond_t cond;
- ast_mutex_t lock;
- AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
-} devstate_collector = {
- .thread = AST_PTHREADT_NULL,
-};
+struct stasis_subscription *devstate_message_sub;
static struct stasis_topic *__device_state_cluster_topic_all;
static struct stasis_caching_topic *__device_state_cluster_topic_cached;
@@ -554,19 +541,7 @@
return NULL;
}
-static void destroy_devstate_change(struct devstate_change *sc)
-{
- ao2_cleanup(sc->dev_state);
- sc->dev_state = NULL;
- ast_free(sc);
-}
-
#define MAX_SERVERS 64
-struct change_collection {
- struct devstate_change states[MAX_SERVERS];
- size_t num_states;
-};
-
static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
{
struct stasis_message *msg = obj;
@@ -610,45 +585,63 @@
return new_dev_state;
}
-static void handle_devstate_change(struct devstate_change *sc)
+static enum ast_device_state get_agg_state(char *device)
{
RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
struct ast_devstate_aggregate agg;
+
+ ast_devstate_aggregate_init(&agg);
+
+ cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+
+ ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
+
+ return ast_devstate_aggregate_result(&agg);
+}
+
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
enum ast_device_state agg_state;
- char *device = (char *)sc->dev_state->device;
- RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
+ char *device;
+ struct stasis_device_state *dev_state;
RAII_VAR(struct stasis_message *, new_agg_msg, NULL, ao2_cleanup);
RAII_VAR(struct stasis_device_state *, new_agg_state, NULL, ao2_cleanup);
- ast_devstate_aggregate_init(&agg);
+ if (stasis_device_state() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+
+ device = (char *)dev_state->device;
ast_debug(1, "Processing device state change for '%s'\n", device);
- cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
-
- ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
-
- agg_state = ast_devstate_aggregate_result(&agg);
-
- cached_agg_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
-
- ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
- ast_devstate2str(agg_state), device);
-
- if (cached_agg_msg) {
- struct stasis_device_state *cached_agg_dev_state = stasis_message_data(cached_agg_msg);
- if (cached_agg_dev_state->state == agg_state) {
- /* No change since last reported device state */
- ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
- sc->dev_state->device, ast_devstate2str(agg_state));
- return;
+ if (dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
+ /* if it's not cachable, there will be no aggregate state to get
+ * and this should be passed through */
+ agg_state = dev_state->state;
+ } else {
+ RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
+
+ agg_state = get_agg_state(device);
+
+ ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
+ ast_devstate2str(agg_state), device);
+
+ cached_agg_msg = stasis_cache_get(stasis_device_state_topic_cached(), stasis_device_state(), device);
+ if (cached_agg_msg) {
+ struct stasis_device_state *cached_agg_dev_state = stasis_message_data(cached_agg_msg);
+ if (cached_agg_dev_state->state == agg_state) {
+ /* No change since last reported device state */
+ ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
+ device, ast_devstate2str(agg_state));
+ return;
+ }
}
}
- new_agg_state = device_state_alloc(device,
- sc->dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE ? sc->dev_state->state : agg_state,
- sc->dev_state->cachable,
- NULL);
+ new_agg_state = device_state_alloc(device, agg_state, dev_state->cachable, NULL);
ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
device, ast_devstate2str(new_agg_state->state));
@@ -659,45 +652,6 @@
ast_assert(stasis_device_state_topic(device) != NULL);
stasis_publish(stasis_device_state_topic(device), new_agg_msg);
-}
-
-static void *run_devstate_collector(void *data)
-{
- for (;;) {
- struct devstate_change *sc;
-
- ast_mutex_lock(&devstate_collector.lock);
- while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry)))
- ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock);
- ast_mutex_unlock(&devstate_collector.lock);
-
- handle_devstate_change(sc);
-
- destroy_devstate_change(sc);
- }
-
- return NULL;
-}
-
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
- struct devstate_change *sc;
-
- if (stasis_device_state() != stasis_message_type(msg)) {
- return;
- }
-
- if (!(sc = ast_calloc(1, sizeof(*sc)))) {
- return;
- }
-
- sc->dev_state = stasis_message_data(msg);
- ao2_ref(sc->dev_state, +1);
-
- ast_mutex_lock(&devstate_collector.lock);
- AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
- ast_cond_signal(&devstate_collector.cond);
- ast_mutex_unlock(&devstate_collector.lock);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -913,19 +867,12 @@
return -1;
}
- devstate_collector.event_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
-
- if (!devstate_collector.event_sub) {
+ devstate_message_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
+
+ if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
return -1;
}
- ast_mutex_init(&devstate_collector.lock);
- ast_cond_init(&devstate_collector.cond, NULL);
- if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) {
- ast_log(LOG_ERROR, "Unable to start device state collector thread.\n");
- return -1;
- }
-
return 0;
}
Modified: team/kmoore/stasis-device_state/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/pbx.c?view=diff&rev=383029&r1=383028&r2=383029
==============================================================================
--- team/kmoore/stasis-device_state/main/pbx.c (original)
+++ team/kmoore/stasis-device_state/main/pbx.c Wed Mar 13 10:47:46 2013
@@ -1129,11 +1129,6 @@
char *message;
};
-struct statechange {
- AST_LIST_ENTRY(statechange) entry;
- struct stasis_device_state *dev_state;
-};
-
struct pbx_exception {
AST_DECLARE_STRING_FIELDS(
AST_STRING_FIELD(context); /*!< Context associated with this exception */
@@ -5255,34 +5250,36 @@
ao2_iterator_destroy(&iter);
}
-static int handle_statechange(void *datap)
-{
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct stasis_device_state *dev_state;
struct ast_hint *hint;
struct ast_str *hint_app;
struct ast_hintdevice *device;
struct ast_hintdevice *cmpdevice;
- struct statechange *sc = datap;
struct ao2_iterator *dev_iter;
struct ao2_iterator cb_iter;
char context_name[AST_MAX_CONTEXT];
char exten_name[AST_MAX_EXTENSION];
+ if (stasis_device_state() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+
if (ao2_container_count(hintdevices) == 0) {
/* There are no hints monitoring devices. */
- ao2_cleanup(sc->dev_state);
- ast_free(sc);
- return 0;
+ return;
}
hint_app = ast_str_create(1024);
if (!hint_app) {
- ao2_cleanup(sc->dev_state);
- ast_free(sc);
- return -1;
- }
-
- cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev_state->device));
- strcpy(cmpdevice->hintdevice, sc->dev_state->device);
+ return;
+ }
+
+ cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(dev_state->device));
+ strcpy(cmpdevice->hintdevice, dev_state->device);
ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */
dev_iter = ao2_t_callback(hintdevices,
@@ -5293,9 +5290,7 @@
if (!dev_iter) {
ast_mutex_unlock(&context_merge_lock);
ast_free(hint_app);
- ao2_cleanup(sc->dev_state);
- ast_free(sc);
- return -1;
+ return;
}
for (; (device = ao2_iterator_next(dev_iter)); ao2_t_ref(device, -1, "Next device")) {
@@ -5392,9 +5387,7 @@
ao2_iterator_destroy(dev_iter);
ast_free(hint_app);
- ao2_cleanup(sc->dev_state);
- ast_free(sc);
- return 0;
+ return;
}
/*!
@@ -8858,7 +8851,7 @@
/*
* Notify watchers of all removed hints with the same lock
- * environment as handle_statechange().
+ * environment as device_state_cb().
*/
while ((saved_hint = AST_LIST_REMOVE_HEAD(&hints_removed, list))) {
/* this hint has been removed, notify the watchers */
@@ -11719,26 +11712,6 @@
}
}
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
-{
- struct statechange *sc;
-
- if (stasis_device_state() != stasis_message_type(msg)) {
- return;
- }
-
- if (!(sc = ast_calloc(1, sizeof(*sc)))) {
- return;
- }
-
- sc->dev_state = stasis_message_data(msg);
- ao2_ref(sc->dev_state, +1);
- if (ast_taskprocessor_push(extension_state_tps, handle_statechange, sc) < 0) {
- ao2_cleanup(sc->dev_state);
- ast_free(sc);
- }
-}
-
/*!
* \internal
* \brief Implements the hints data provider.
More information about the asterisk-commits
mailing list