[asterisk-commits] kmoore: branch kmoore/stasis-device_state r383037 - in /team/kmoore/stasis-de...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Mar 13 14:02:34 CDT 2013
Author: kmoore
Date: Wed Mar 13 14:02:30 2013
New Revision: 383037
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=383037
Log:
Collapse the device state topics/caches into one
This makes the stasis_device_state_topic_all() topic your one stop shop
for all things device state.
Modified:
team/kmoore/stasis-device_state/apps/app_queue.c
team/kmoore/stasis-device_state/include/asterisk/devicestate.h
team/kmoore/stasis-device_state/main/ccss.c
team/kmoore/stasis-device_state/main/devicestate.c
team/kmoore/stasis-device_state/main/pbx.c
team/kmoore/stasis-device_state/res/res_jabber.c
team/kmoore/stasis-device_state/res/res_xmpp.c
Modified: team/kmoore/stasis-device_state/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/apps/app_queue.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/apps/app_queue.c (original)
+++ team/kmoore/stasis-device_state/apps/app_queue.c Wed Mar 13 14:02:30 2013
@@ -1749,6 +1749,10 @@
}
dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
qiter = ao2_iterator_init(queues, 0);
while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
Modified: team/kmoore/stasis-device_state/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/include/asterisk/devicestate.h?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/include/asterisk/devicestate.h (original)
+++ team/kmoore/stasis-device_state/include/asterisk/devicestate.h Wed Mar 13 14:02:30 2013
@@ -280,25 +280,9 @@
AST_STRING_FIELD(device); /*!< The name of the device */
);
enum ast_device_state state; /*!< The state of the device */
- struct ast_eid eid; /*!< The EID of the server where this message originated */
+ struct ast_eid *eid; /*!< The EID of the server where this message originated, NULL EID means aggregate state */
enum ast_devstate_cache cachable; /*!< Flag designating the cachability of this device state */
};
-
-/*!
- * \brief Get the Stasis topic for cluster-wide device state messages.
- * \retval The topic for device state messages with EID
- * \retval NULL if it has not been allocated
- * \since 12
- */
-struct stasis_topic *stasis_device_state_cluster_topic_all(void);
-
-/*!
- * \brief Get the Stasis caching topic for cluster-wide device state messages
- * \retval The caching topic for MWI messages
- * \retval NULL if it has not been allocated
- * \since 12
- */
-struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void);
/*!
* \brief Get the Stasis topic for aggregated device state messages
@@ -351,7 +335,7 @@
* \since 12
*/
#define stasis_publish_device_state(device, state, cachable) \
- stasis_publish_device_state_full(device, state, cachable, NULL)
+ stasis_publish_device_state_full(device, state, cachable, &ast_eid_default)
/*!
* \brief Publish a device state update with EID
Modified: team/kmoore/stasis-device_state/main/ccss.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/ccss.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/ccss.c (original)
+++ team/kmoore/stasis-device_state/main/ccss.c Wed Mar 13 14:02:30 2013
@@ -1297,10 +1297,10 @@
static int generic_monitor_devstate_tp_cb(void *data)
{
- struct stasis_device_state *gtcd = data;
- enum ast_device_state new_state = gtcd->state;
- enum ast_device_state previous_state = gtcd->state;
- const char *monitor_name = gtcd->device;
+ struct stasis_device_state *dev_state = data;
+ enum ast_device_state new_state = dev_state->state;
+ enum ast_device_state previous_state = dev_state->state;
+ const char *monitor_name = dev_state->device;
struct generic_monitor_instance_list *generic_list;
struct generic_monitor_instance *generic_instance;
@@ -1309,14 +1309,14 @@
* time between subscribing to its device state and the time this executes.
* Not really a big deal.
*/
- ao2_cleanup(gtcd);
+ ao2_cleanup(dev_state);
return 0;
}
if (generic_list->current_state == new_state) {
/* The device state hasn't actually changed, so we don't really care */
cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
- ao2_cleanup(gtcd);
+ ao2_cleanup(dev_state);
return 0;
}
@@ -1336,7 +1336,7 @@
}
}
cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
- ao2_cleanup(gtcd);
+ ao2_cleanup(dev_state);
return 0;
}
@@ -1347,17 +1347,21 @@
* so that all monitor operations can be serialized. Locks?! We don't need
* no steenkin' locks!
*/
- struct stasis_device_state *gtcd;
+ struct stasis_device_state *dev_state;
if (stasis_device_state() != stasis_message_type(msg)) {
return;
}
- gtcd = stasis_message_data(msg);
-
- ao2_ref(gtcd, +1);
- if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) {
- ao2_cleanup(gtcd);
- }
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
+
+ if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, dev_state)) {
+ return;
+ }
+ ao2_ref(dev_state, +1);
}
int ast_cc_available_timer_expire(const void *data)
@@ -2623,13 +2627,19 @@
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;
- struct stasis_device_state *dev_state = stasis_message_data(msg);
+ struct stasis_device_state *dev_state;
struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
if (stasis_subscription_final_message(sub, msg)) {
cc_unref(agent, "Done holding ref for subscription");
return;
} else if (stasis_device_state() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
return;
}
Modified: team/kmoore/stasis-device_state/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/devicestate.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/devicestate.c (original)
+++ team/kmoore/stasis-device_state/main/devicestate.c Wed Mar 13 14:02:30 2013
@@ -193,8 +193,6 @@
struct stasis_subscription *devstate_message_sub;
-static struct stasis_topic *__device_state_cluster_topic_all;
-static struct stasis_caching_topic *__device_state_cluster_topic_cached;
static struct stasis_topic *__device_state_topic_all;
static struct stasis_caching_topic *__device_state_topic_cached;
static struct stasis_message_type *__device_state_message_type;
@@ -549,7 +547,8 @@
char *device = data;
struct stasis_device_state *dev_state = stasis_message_data(msg);
- if (strcmp(device, dev_state->device)) {
+ if (!dev_state->eid || strcmp(device, dev_state->device)) {
+ /* ignore aggregate states and devices that don't match */
return 0;
}
ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
@@ -562,6 +561,7 @@
{
struct stasis_device_state *device_state = obj;
ast_string_field_free_memory(device_state);
+ ast_free(device_state->eid);
}
static struct stasis_device_state *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cached, const struct ast_eid *eid)
@@ -575,10 +575,23 @@
ast_string_field_set(new_dev_state, device, device);
new_dev_state->state = state;
+
if (eid) {
- new_dev_state->eid = *eid;
+ char eid_str[20];
+ struct ast_str *uniqueid = ast_str_alloca(256);
+
+ new_dev_state->eid = ast_malloc(sizeof(*eid));
+ if (!new_dev_state->eid) {
+ return NULL;
+ }
+
+ *new_dev_state->eid = *eid;
+ ast_eid_to_str(eid_str, sizeof(eid_str), new_dev_state->eid);
+ ast_str_set(&uniqueid, 0, "%s%s", eid_str, device);
+ ast_string_field_set(new_dev_state, uniqueid, ast_str_buffer(uniqueid));
} else {
- ast_set_default_eid(&new_dev_state->eid);
+ /* no EID makes this an aggregate state */
+ ast_string_field_set(new_dev_state, uniqueid, device);
}
ao2_ref(new_dev_state, +1);
@@ -592,7 +605,7 @@
ast_devstate_aggregate_init(&agg);
- cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+ cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &agg, device);
@@ -613,8 +626,12 @@
dev_state = stasis_message_data(msg);
+ if (!dev_state->eid) {
+ /* ignore aggregate messages */
+ return;
+ }
+
device = (char *)dev_state->device;
-
ast_debug(1, "Processing device state change for '%s'\n", device);
if (dev_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
@@ -625,7 +642,6 @@
RAII_VAR(struct stasis_message *, cached_agg_msg, NULL, ao2_cleanup);
agg_state = get_agg_state(device);
-
ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
ast_devstate2str(agg_state), device);
@@ -641,17 +657,10 @@
}
}
- new_agg_state = device_state_alloc(device, agg_state, dev_state->cachable, NULL);
-
ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
- device, ast_devstate2str(new_agg_state->state));
-
- ast_string_field_set(new_agg_state, uniqueid, device);
-
- new_agg_msg = stasis_message_create(stasis_device_state(), new_agg_state);
-
- ast_assert(stasis_device_state_topic(device) != NULL);
- stasis_publish(stasis_device_state_topic(device), new_agg_msg);
+ device, ast_devstate2str(agg_state));
+
+ stasis_publish_device_state_full(device, agg_state, dev_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -704,16 +713,6 @@
return agg->state;
}
-struct stasis_topic *stasis_device_state_cluster_topic_all(void)
-{
- return __device_state_cluster_topic_all;
-}
-
-struct stasis_caching_topic *stasis_device_state_cluster_topic_cached(void)
-{
- return __device_state_cluster_topic_cached;
-}
-
struct stasis_topic *stasis_device_state_topic_all(void)
{
return __device_state_topic_all;
@@ -771,8 +770,6 @@
{
RAII_VAR(struct stasis_device_state *, device_state, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- struct ast_str *uniqueid = ast_str_alloca(256);
- char eid_str[20];
ast_assert(!ast_strlen_zero(device));
@@ -780,16 +777,11 @@
if (!device_state) {
return -1;
}
- ast_eid_to_str(eid_str, sizeof(eid_str), &device_state->eid);
-
- ast_str_set(&uniqueid, 0, "%s%s", eid_str, device);
- ast_string_field_set(device_state, uniqueid, ast_str_buffer(uniqueid));
message = stasis_message_create(stasis_device_state(), device_state);
- ast_assert(stasis_device_state_topic(ast_str_buffer(uniqueid)) != NULL);
- stasis_publish(stasis_device_state_cluster_topic_all(), message);
-
+ ast_assert(stasis_device_state_topic(device) != NULL);
+ stasis_publish(stasis_device_state_topic(device), message);
return 0;
}
@@ -823,10 +815,6 @@
static void devstate_exit(void)
{
- ao2_cleanup(__device_state_cluster_topic_all);
- __device_state_cluster_topic_all = NULL;
- stasis_caching_unsubscribe(__device_state_cluster_topic_cached);
- __device_state_cluster_topic_cached = NULL;
ao2_cleanup(__device_state_topic_all);
__device_state_topic_all = NULL;
stasis_caching_unsubscribe(__device_state_topic_cached);
@@ -842,14 +830,6 @@
int devstate_init(void)
{
ast_register_atexit(devstate_exit);
- __device_state_cluster_topic_all = stasis_topic_create("stasis_device_state_cluster_topic");
- if (!__device_state_cluster_topic_all) {
- return -1;
- }
- __device_state_cluster_topic_cached = stasis_caching_topic_create(__device_state_cluster_topic_all, device_state_get_id);
- if (!__device_state_cluster_topic_cached) {
- return -1;
- }
__device_state_topic_all = stasis_topic_create("stasis_device_state_topic");
if (!__device_state_topic_all) {
return -1;
@@ -867,7 +847,7 @@
return -1;
}
- devstate_message_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), devstate_change_collector_cb, NULL);
+ devstate_message_sub = stasis_subscribe(stasis_device_state_topic_all(), devstate_change_collector_cb, NULL);
if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
Modified: team/kmoore/stasis-device_state/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/main/pbx.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/main/pbx.c (original)
+++ team/kmoore/stasis-device_state/main/pbx.c Wed Mar 13 14:02:30 2013
@@ -5267,6 +5267,10 @@
}
dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
if (ao2_container_count(hintdevices) == 0) {
/* There are no hints monitoring devices. */
Modified: team/kmoore/stasis-device_state/res/res_jabber.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/res/res_jabber.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/res/res_jabber.c (original)
+++ team/kmoore/stasis-device_state/res/res_jabber.c Wed Mar 13 14:02:30 2013
@@ -3284,8 +3284,8 @@
}
dev_state = stasis_message_data(msg);
- if (ast_eid_cmp(&ast_eid_default, &dev_state->eid)) {
- /* If the event didn't originate from this server, don't send it back out. */
+ if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) {
+ /* If the event is aggregate or didn't originate from this server, don't send it out. */
return;
}
@@ -3314,9 +3314,9 @@
if (!device_state_sub) {
RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
client = ASTOBJ_REF(client);
- device_state_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(),
+ device_state_sub = stasis_subscribe(stasis_device_state_topic_all(),
aji_devstate_cb, client);
- cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+ cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
}
Modified: team/kmoore/stasis-device_state/res/res_xmpp.c
URL: http://svnview.digium.com/svn/asterisk/team/kmoore/stasis-device_state/res/res_xmpp.c?view=diff&rev=383037&r1=383036&r2=383037
==============================================================================
--- team/kmoore/stasis-device_state/res/res_xmpp.c (original)
+++ team/kmoore/stasis-device_state/res/res_xmpp.c Wed Mar 13 14:02:30 2013
@@ -1359,8 +1359,8 @@
}
dev_state = stasis_message_data(msg);
- if (ast_eid_cmp(&ast_eid_default, &dev_state->eid)) {
- /* If the event didn't originate from this server, don't send it back out. */
+ if (!dev_state->eid || ast_eid_cmp(&ast_eid_default, dev_state->eid)) {
+ /* If the event is aggregate or didn't originate from this server, don't send it out. */
return;
}
@@ -1602,14 +1602,14 @@
return;
}
- if (!(client->device_state_sub = stasis_subscribe(stasis_device_state_cluster_topic_all(), xmpp_pubsub_devstate_cb, client))) {
+ if (!(client->device_state_sub = stasis_subscribe(stasis_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) {
ast_event_unsubscribe(client->mwi_sub);
client->mwi_sub = NULL;
return;
}
ao2_ref(client, +1);
- cached = stasis_cache_dump(stasis_device_state_cluster_topic_cached(), NULL);
+ cached = stasis_cache_dump(stasis_device_state_topic_cached(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
xmpp_pubsub_subscribe(client, "device_state");
More information about the asterisk-commits
mailing list