[asterisk-commits] kmoore: trunk r385860 - in /trunk: apps/ include/asterisk/ main/ res/ tests/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Apr 16 10:34:03 CDT 2013
Author: kmoore
Date: Tue Apr 16 10:33:59 2013
New Revision: 385860
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=385860
Log:
Move device state distribution to Stasis-core
In the move from Asterisk's event system to Stasis, this makes
distributed device state aggregation always-on, removes unnecessary
task processors where possible, and collapses aggregate and
non-aggregate states into a single cache for ease of retrieval. This
also removes an intermediary step in device state aggregation.
Review: https://reviewboard.asterisk.org/r/2389/
(closes issue ASTERISK-21101)
Patch-by: Kinsey Moore <kmoore at digium.com>
Modified:
trunk/apps/app_queue.c
trunk/include/asterisk/devicestate.h
trunk/include/asterisk/xmpp.h
trunk/main/asterisk.c
trunk/main/ccss.c
trunk/main/devicestate.c
trunk/main/pbx.c
trunk/res/res_jabber.c
trunk/res/res_xmpp.c
trunk/tests/test_devicestate.c
Modified: trunk/apps/app_queue.c
URL: http://svnview.digium.com/svn/asterisk/trunk/apps/app_queue.c?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/apps/app_queue.c (original)
+++ trunk/apps/app_queue.c Tue Apr 16 10:33:59 2013
@@ -990,9 +990,6 @@
{ QUEUE_AUTOPAUSE_ALL,"all" },
};
-
-static struct ast_taskprocessor *devicestate_tps;
-
#define DEFAULT_RETRY 5
#define DEFAULT_TIMEOUT 15
#define RECHECK 1 /*!< Recheck every second to see we we're at the top yet */
@@ -1037,8 +1034,8 @@
/*! \brief queues.conf [general] option */
static int shared_lastcall = 1;
-/*! \brief Subscription to device state change events */
-static struct ast_event_sub *device_state_sub;
+/*! \brief Subscription to device state change messages */
+static struct stasis_subscription *device_state_sub;
/*! \brief queues.conf [general] option */
static int update_cdr = 0;
@@ -1618,12 +1615,6 @@
return -1;
}
-struct statechange {
- AST_LIST_ENTRY(statechange) entry;
- int state;
- char dev[0];
-};
-
/*! \brief set a member's status based on device state of that member's state_interface.
*
* Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -1742,10 +1733,10 @@
}
/*! \brief set a member's status based on device state of that member's interface*/
-static int handle_statechange(void *datap)
-{
- struct statechange *sc = datap;
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
struct ao2_iterator miter, qiter;
+ struct ast_device_state_message *dev_state;
struct member *m;
struct call_queue *q;
char interface[80], *slash_pos;
@@ -1753,6 +1744,16 @@
int found_member; /* Found this member in this queue */
int avail = 0; /* Found an available member in this queue */
+ if (ast_device_state_message_type() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
+
qiter = ao2_iterator_init(queues, 0);
while ((q = ao2_t_iterator_next(&qiter, "Iterate over queues"))) {
ao2_lock(q);
@@ -1770,9 +1771,9 @@
}
}
- if (!strcasecmp(interface, sc->dev)) {
+ if (!strcasecmp(interface, dev_state->device)) {
found_member = 1;
- update_status(q, m, sc->state);
+ update_status(q, m, dev_state->state);
}
}
@@ -1804,39 +1805,18 @@
ao2_iterator_destroy(&qiter);
if (found) {
- ast_debug(1, "Device '%s' changed to state '%d' (%s)\n", sc->dev, sc->state, ast_devstate2str(sc->state));
+ ast_debug(1, "Device '%s' changed to state '%d' (%s)\n",
+ dev_state->device,
+ dev_state->state,
+ ast_devstate2str(dev_state->state));
} else {
- ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", sc->dev, sc->state, ast_devstate2str(sc->state));
- }
-
- ast_free(sc);
- return 0;
-}
-
-static void device_state_cb(const struct ast_event *event, void *unused)
-{
- enum ast_device_state state;
- const char *device;
- struct statechange *sc;
- size_t datapsize;
-
- state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
- device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
-
- if (ast_strlen_zero(device)) {
- ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
- return;
- }
- datapsize = sizeof(*sc) + strlen(device) + 1;
- if (!(sc = ast_calloc(1, datapsize))) {
- ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
- return;
- }
- sc->state = state;
- strcpy(sc->dev, device);
- if (ast_taskprocessor_push(devicestate_tps, handle_statechange, sc) < 0) {
- ast_free(sc);
- }
+ ast_debug(3, "Device '%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n",
+ dev_state->device,
+ dev_state->state,
+ ast_devstate2str(dev_state->state));
+ }
+
+ return;
}
/*! \brief Helper function which converts from extension state to device state values */
@@ -9876,8 +9856,9 @@
res |= ast_data_unregister(NULL);
- if (device_state_sub)
- ast_event_unsubscribe(device_state_sub);
+ if (device_state_sub) {
+ device_state_sub = stasis_unsubscribe(device_state_sub);
+ }
ast_extension_state_del(0, extension_state_cb);
@@ -9887,7 +9868,6 @@
queue_t_unref(q, "Done with iterator");
}
ao2_iterator_destroy(&q_iter);
- devicestate_tps = ast_taskprocessor_unreference(devicestate_tps);
ao2_ref(queues, -1);
ast_unload_realtime("queue_members");
return res;
@@ -9948,12 +9928,8 @@
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_custom_function_register(&queuememberpenalty_function);
- if (!(devicestate_tps = ast_taskprocessor_get("app_queue", 0))) {
- ast_log(LOG_WARNING, "devicestate taskprocessor reference failed - devicestate notifications will not occur\n");
- }
-
/* in the following subscribe call, do I use DEVICE_STATE, or DEVICE_STATE_CHANGE? */
- if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, "AppQueue Device state", NULL, AST_EVENT_IE_END))) {
+ if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) {
res = -1;
}
Modified: trunk/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/devicestate.h?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/include/asterisk/devicestate.h (original)
+++ trunk/include/asterisk/devicestate.h Tue Apr 16 10:33:59 2013
@@ -38,6 +38,7 @@
#define _ASTERISK_DEVICESTATE_H
#include "asterisk/channelstate.h"
+#include "asterisk/utils.h"
#if defined(__cplusplus) || defined(c_plusplus)
extern "C" {
@@ -270,19 +271,87 @@
};
/*!
- * \brief Enable distributed device state processing.
- *
- * \details
- * By default, Asterisk assumes that device state change events will only be
- * originating from one instance. If a module gets loaded and configured such
- * that multiple instances of Asterisk will be sharing device state, this
- * function should be called to enable distributed device state processing.
- * It is off by default to save on unnecessary processing.
- *
- * \retval 0 success
- * \retval -1 failure
- */
-int ast_enable_distributed_devstate(void);
+ * \brief The structure that contains device state
+ * \since 12
+ */
+struct ast_device_state_message {
+ AST_DECLARE_STRING_FIELDS(
+ AST_STRING_FIELD(cache_id); /*!< A unique ID used for hashing */
+ AST_STRING_FIELD(device); /*!< The name of the device */
+ );
+ enum ast_device_state state; /*!< The state of the device */
+ struct ast_eid *eid; /*!< The EID of the server where this message originated, NULL EID means aggregate state */
+ enum ast_devstate_cache cachable; /*!< Flag designating the cachability of this device state */
+};
+
+/*!
+ * \brief Get the Stasis topic for device state messages
+ * \retval The topic for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_topic *ast_device_state_topic_all(void);
+
+/*!
+ * \brief Get the Stasis topic for device state messages for a specific device
+ * \param uniqueid The device for which to get the topic
+ * \retval The topic structure for MWI messages for a given device
+ * \retval NULL if it failed to be found or allocated
+ * \since 12
+ */
+struct stasis_topic *ast_device_state_topic(const char *device);
+
+/*!
+ * \brief Get the Stasis caching topic for device state messages
+ * \retval The caching topic for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_caching_topic *ast_device_state_topic_cached(void);
+
+/*!
+ * \brief Get the Stasis message type for device state messages
+ * \retval The message type for device state messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_message_type *ast_device_state_message_type(void);
+
+/*!
+ * \brief Initialize the device state core
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int devstate_init(void);
+
+/*!
+ * \brief Publish a device state update
+ * \param[in] device The device name
+ * \param[in] state The state of the device
+ * \param[in] cachable Whether the device state can be cached
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+#define ast_publish_device_state(device, state, cachable) \
+ ast_publish_device_state_full(device, state, cachable, &ast_eid_default)
+
+/*!
+ * \brief Publish a device state update with EID
+ * \param[in] device The device name
+ * \param[in] state The state of the device
+ * \param[in] cachable Whether the device state can be cached
+ * \param[in] eid The EID of the server that originally published the message
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int ast_publish_device_state_full(
+ const char *device,
+ enum ast_device_state state,
+ enum ast_devstate_cache cachable,
+ struct ast_eid *eid);
#if defined(__cplusplus) || defined(c_plusplus)
}
Modified: trunk/include/asterisk/xmpp.h
URL: http://svnview.digium.com/svn/asterisk/trunk/include/asterisk/xmpp.h?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/include/asterisk/xmpp.h (original)
+++ trunk/include/asterisk/xmpp.h Tue Apr 16 10:33:59 2013
@@ -47,6 +47,7 @@
#include "asterisk/linkedlists.h"
#include "asterisk/stringfields.h"
#include "asterisk/pbx.h"
+#include "asterisk/stasis.h"
/*
* As per RFC 3920 - section 3.1, the maximum length for a full Jabber ID
@@ -135,7 +136,7 @@
int timeout;
unsigned int reconnect:1; /*!< Reconnect this client */
struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */
- struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */
+ struct stasis_subscription *device_state_sub; /*!< If distributing event information the device state subscription */
};
/*!
Modified: trunk/main/asterisk.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/asterisk.c?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/main/asterisk.c (original)
+++ trunk/main/asterisk.c Tue Apr 16 10:33:59 2013
@@ -4180,6 +4180,11 @@
aco_init();
+ if (devstate_init()) {
+ printf("Device state core initialization failed.\n%s", term_quit());
+ exit(1);
+ }
+
if (app_init()) {
printf("App core initialization failed.\n%s", term_quit());
exit(1);
Modified: trunk/main/ccss.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/ccss.c?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/main/ccss.c (original)
+++ trunk/main/ccss.c Tue Apr 16 10:33:59 2013
@@ -1209,7 +1209,7 @@
* recalled
*/
int fit_for_recall;
- struct ast_event_sub *sub;
+ struct stasis_subscription *sub;
AST_LIST_HEAD_NOLOCK(, generic_monitor_instance) list;
};
@@ -1260,19 +1260,20 @@
struct generic_monitor_instance_list *generic_list = obj;
struct generic_monitor_instance *generic_instance;
- generic_list->sub = ast_event_unsubscribe(generic_list->sub);
+ generic_list->sub = stasis_unsubscribe(generic_list->sub);
while ((generic_instance = AST_LIST_REMOVE_HEAD(&generic_list->list, next))) {
ast_free(generic_instance);
}
ast_free((char *)generic_list->device_name);
}
-static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
{
struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
generic_monitor_instance_list_destructor, "allocate generic monitor instance list");
char * device_name;
+ struct stasis_topic *device_specific_topic;
if (!generic_list) {
return NULL;
@@ -1285,11 +1286,12 @@
ast_tech_to_upper(device_name);
generic_list->device_name = device_name;
- if (!(generic_list->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE,
- generic_monitor_devstate_cb, "Requesting CC", NULL,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, monitor->interface->device_name,
- AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS,
- AST_EVENT_IE_END))) {
+ device_specific_topic = ast_device_state_topic(device_name);
+ if (!device_specific_topic) {
+ return NULL;
+ }
+
+ if (!(generic_list->sub = stasis_subscribe(device_specific_topic, generic_monitor_devstate_cb, NULL))) {
cc_unref(generic_list, "Failed to subscribe to device state");
return NULL;
}
@@ -1298,35 +1300,25 @@
return generic_list;
}
-struct generic_tp_cb_data {
- const char *device_name;
- enum ast_device_state new_state;
-};
-
static int generic_monitor_devstate_tp_cb(void *data)
{
- struct generic_tp_cb_data *gtcd = data;
- enum ast_device_state new_state = gtcd->new_state;
- enum ast_device_state previous_state = gtcd->new_state;
- const char *monitor_name = gtcd->device_name;
+ RAII_VAR(struct ast_device_state_message *, dev_state, data, ao2_cleanup);
+ enum ast_device_state new_state = dev_state->state;
+ enum ast_device_state previous_state;
struct generic_monitor_instance_list *generic_list;
struct generic_monitor_instance *generic_instance;
- if (!(generic_list = find_generic_monitor_instance_list(monitor_name))) {
+ if (!(generic_list = find_generic_monitor_instance_list(dev_state->device))) {
/* The most likely cause for this is that we destroyed the monitor in the
* time between subscribing to its device state and the time this executes.
* Not really a big deal.
*/
- ast_free((char *) gtcd->device_name);
- ast_free(gtcd);
return 0;
}
if (generic_list->current_state == new_state) {
/* The device state hasn't actually changed, so we don't really care */
cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
- ast_free((char *) gtcd->device_name);
- ast_free(gtcd);
return 0;
}
@@ -1346,33 +1338,31 @@
}
}
cc_unref(generic_list, "Kill reference of generic list in devstate taskprocessor callback");
- ast_free((char *) gtcd->device_name);
- ast_free(gtcd);
return 0;
}
-static void generic_monitor_devstate_cb(const struct ast_event *event, void *userdata)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
{
/* Wow, it's cool that we've picked up on a state change, but we really want
* the actual work to be done in the core's taskprocessor execution thread
* so that all monitor operations can be serialized. Locks?! We don't need
* no steenkin' locks!
*/
- struct generic_tp_cb_data *gtcd = ast_calloc(1, sizeof(*gtcd));
-
- if (!gtcd) {
+ struct ast_device_state_message *dev_state;
+ if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
- if (!(gtcd->device_name = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)))) {
- ast_free(gtcd);
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
return;
}
- gtcd->new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
- if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, gtcd)) {
- ast_free((char *)gtcd->device_name);
- ast_free(gtcd);
+
+ ao2_t_ref(dev_state, +1, "Bumping dev_state ref for cc_core_taskprocessor");
+ if (ast_taskprocessor_push(cc_core_taskprocessor, generic_monitor_devstate_tp_cb, dev_state)) {
+ ao2_cleanup(dev_state);
+ return;
}
}
@@ -2502,7 +2492,7 @@
* device state of the caller in order to
* determine when we may move on
*/
- struct ast_event_sub *sub;
+ struct stasis_subscription *sub;
/*!
* Scheduler id of offer timer.
*/
@@ -2635,34 +2625,33 @@
return 0;
}
-static int generic_agent_devstate_unsubscribe(void *data)
-{
- struct ast_cc_agent *agent = data;
- struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
-
- if (generic_pvt->sub != NULL) {
- generic_pvt->sub = ast_event_unsubscribe(generic_pvt->sub);
- }
- cc_unref(agent, "Done unsubscribing from devstate");
- return 0;
-}
-
-static void generic_agent_devstate_cb(const struct ast_event *event, void *userdata)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;
-
- new_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
+ struct ast_device_state_message *dev_state;
+ struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
+
+ if (stasis_subscription_final_message(sub, msg)) {
+ cc_unref(agent, "Done holding ref for subscription");
+ return;
+ } else if (ast_device_state_message_type() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
+
+ new_state = dev_state->state;
if (!cc_generic_is_device_available(new_state)) {
/* Not interested in this new state of the device. It is still busy. */
return;
}
- /* We can't unsubscribe from device state events here because it causes a deadlock */
- if (ast_taskprocessor_push(cc_core_taskprocessor, generic_agent_devstate_unsubscribe,
- cc_ref(agent, "ref agent for device state unsubscription"))) {
- cc_unref(agent, "Unref agent unsubscribing from devstate failed");
- }
+ generic_pvt->sub = stasis_unsubscribe(sub);
ast_cc_agent_caller_available(agent->core_id, "%s is no longer busy", agent->device_name);
}
@@ -2670,18 +2659,21 @@
{
struct cc_generic_agent_pvt *generic_pvt = agent->private_data;
struct ast_str *str = ast_str_alloca(128);
+ struct stasis_topic *device_specific_topic;
ast_assert(generic_pvt->sub == NULL);
ast_str_set(&str, 0, "Agent monitoring %s device state since it is busy\n",
agent->device_name);
- if (!(generic_pvt->sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE,
- generic_agent_devstate_cb, ast_str_buffer(str), agent,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, agent->device_name,
- AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_EXISTS,
- AST_EVENT_IE_END))) {
- return -1;
- }
+ device_specific_topic = ast_device_state_topic(agent->device_name);
+ if (!device_specific_topic) {
+ return -1;
+ }
+
+ if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) {
+ return -1;
+ }
+ cc_ref(agent, "Ref agent for subscription");
return 0;
}
@@ -2792,7 +2784,7 @@
cc_generic_agent_stop_offer_timer(agent);
if (agent_pvt->sub) {
- agent_pvt->sub = ast_event_unsubscribe(agent_pvt->sub);
+ agent_pvt->sub = stasis_unsubscribe(agent_pvt->sub);
}
ast_free(agent_pvt);
Modified: trunk/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/devicestate.c?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/main/devicestate.c (original)
+++ trunk/main/devicestate.c Tue Apr 16 10:33:59 2013
@@ -129,7 +129,12 @@
#include "asterisk/devicestate.h"
#include "asterisk/pbx.h"
#include "asterisk/app.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
#include "asterisk/event.h"
+#include "asterisk/devicestate.h"
+
+#define DEVSTATE_TOPIC_BUCKETS 57
/*! \brief Device state strings for printing */
static const char * const devstatestring[][2] = {
@@ -188,25 +193,12 @@
/*! \brief Flag for the queue */
static ast_cond_t change_pending;
-struct devstate_change {
- AST_LIST_ENTRY(devstate_change) entry;
- uint32_t state;
- struct ast_eid eid;
- enum ast_devstate_cache cachable;
- char device[1];
-};
-
-static struct {
- pthread_t thread;
- struct ast_event_sub *event_sub;
- ast_cond_t cond;
- ast_mutex_t lock;
- AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
- unsigned int enabled:1;
-} devstate_collector = {
- .thread = AST_PTHREADT_NULL,
- .enabled = 0,
-};
+struct stasis_subscription *devstate_message_sub;
+
+static struct stasis_topic *device_state_topic_all;
+static struct stasis_caching_topic *device_state_topic_cached;
+static struct stasis_message_type *device_state_message_type;
+static struct stasis_topic_pool *device_state_topic_pool;
/* Forward declarations */
static int getproviderstate(const char *provider, const char *address);
@@ -289,21 +281,16 @@
static enum ast_device_state devstate_cached(const char *device)
{
- enum ast_device_state res = AST_DEVICE_UNKNOWN;
- struct ast_event *event;
-
- event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
- AST_EVENT_IE_END);
-
- if (!event)
- return res;
-
- res = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
- ast_event_destroy(event);
-
- return res;
+ RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+ struct ast_device_state_message *device_state;
+
+ cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ if (!cached_msg) {
+ return AST_DEVICE_UNKNOWN;
+ }
+ device_state = stasis_message_data(cached_msg);
+
+ return device_state->state;
}
/*! \brief Check device state through channel specific function or generic function */
@@ -426,39 +413,9 @@
return res;
}
-static void devstate_event(const char *device, enum ast_device_state state, int cachable)
-{
- struct ast_event *event;
- enum ast_event_type event_type;
-
- if (devstate_collector.enabled) {
- /* Distributed device state is enabled, so this state change is a change
- * for a single server, not the real state. */
- event_type = AST_EVENT_DEVICE_STATE_CHANGE;
- } else {
- event_type = AST_EVENT_DEVICE_STATE;
- }
-
- ast_debug(3, "device '%s' state '%d'\n", device, state);
-
- if (!(event = ast_event_new(event_type,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
- AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
- AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, cachable,
- AST_EVENT_IE_END))) {
- return;
- }
-
- if (cachable) {
- ast_event_queue_and_cache(event);
- } else {
- ast_event_queue(event);
- }
-}
-
/*! Called by the state change thread to find out what the state is, and then
* to queue up the state change event */
-static void do_state_change(const char *device, int cachable)
+static void do_state_change(const char *device, enum ast_devstate_cache cachable)
{
enum ast_device_state state;
@@ -466,7 +423,7 @@
ast_debug(3, "Changing state for %s - state %d (%s)\n", device, state, ast_devstate2str(state));
- devstate_event(device, state, cachable);
+ ast_publish_device_state(device, state, cachable);
}
int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_cache cachable, const char *device)
@@ -490,7 +447,7 @@
*/
if (state != AST_DEVICE_UNKNOWN) {
- devstate_event(device, state, cachable);
+ ast_publish_device_state(device, state, cachable);
} else if (change_thread == AST_PTHREADT_NULL || !(change = ast_calloc(1, sizeof(*change) + strlen(device)))) {
/* we could not allocate a change struct, or */
/* there is no background thread, so process the change now */
@@ -562,176 +519,148 @@
return NULL;
}
-static void destroy_devstate_change(struct devstate_change *sc)
-{
- ast_free(sc);
-}
-
#define MAX_SERVERS 64
-struct change_collection {
- struct devstate_change states[MAX_SERVERS];
- size_t num_states;
-};
-
-static void devstate_cache_cb(const struct ast_event *event, void *data)
-{
- struct change_collection *collection = data;
- int i;
- const struct ast_eid *eid;
-
- if (collection->num_states == ARRAY_LEN(collection->states)) {
- ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n",
- MAX_SERVERS);
+static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
+{
+ struct stasis_message *msg = obj;
+ struct ast_devstate_aggregate *aggregate = arg;
+ char *device = data;
+ struct ast_device_state_message *device_state = stasis_message_data(msg);
+
+ if (!device_state->eid || strcmp(device, device_state->device)) {
+ /* ignore aggregate states and devices that don't match */
+ return 0;
+ }
+ ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
+ ast_devstate2str(device_state->state), device);
+ ast_devstate_aggregate_add(aggregate, device_state->state);
+ return 0;
+}
+
+static void device_state_dtor(void *obj)
+{
+ struct ast_device_state_message *device_state = obj;
+ ast_string_field_free_memory(device_state);
+ ast_free(device_state->eid);
+}
+
+static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
+{
+ RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
+
+ if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
+ return NULL;
+ }
+
+ ast_string_field_set(new_device_state, device, device);
+ new_device_state->state = state;
+ new_device_state->cachable = cachable;
+
+ if (eid) {
+ char eid_str[20];
+ struct ast_str *cache_id = ast_str_alloca(256);
+
+ new_device_state->eid = ast_malloc(sizeof(*eid));
+ if (!new_device_state->eid) {
+ return NULL;
+ }
+
+ *new_device_state->eid = *eid;
+ ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
+ ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
+ ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
+ } else {
+ /* no EID makes this an aggregate state */
+ ast_string_field_set(new_device_state, cache_id, device);
+ }
+
+ ao2_ref(new_device_state, +1);
+ return new_device_state;
+}
+
+static enum ast_device_state get_aggregate_state(char *device)
+{
+ RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
+ struct ast_devstate_aggregate aggregate;
+
+ ast_devstate_aggregate_init(&aggregate);
+
+ cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+
+ ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
+
+ return ast_devstate_aggregate_result(&aggregate);
+}
+
+static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
+{
+ RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
+ struct ast_device_state_message *cached_aggregate_device_state;
+
+ cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
+ if (!cached_aggregate_msg) {
+ return 1;
+ }
+
+ cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
+ if (cached_aggregate_device_state->state == new_aggregate_state) {
+ return 0;
+ }
+ return 1;
+}
+
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
+ enum ast_device_state aggregate_state;
+ char *device;
+ struct ast_device_state_message *device_state;
+ RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
+
+ if (stasis_cache_update_type() == stasis_message_type(msg)) {
+ struct stasis_cache_update *update = stasis_message_data(msg);
+ if (!update->new_snapshot) {
+ return;
+ }
+ msg = update->new_snapshot;
+ }
+
+ if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
- if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
- ast_log(LOG_ERROR, "Device state change event with no EID\n");
+ device_state = stasis_message_data(msg);
+
+ if (!device_state->eid) {
+ /* ignore aggregate messages */
return;
}
- i = collection->num_states;
-
- collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
- collection->states[i].eid = *eid;
-
- collection->num_states++;
-}
-
-static void process_collection(const char *device, enum ast_devstate_cache cachable, struct change_collection *collection)
-{
- int i;
- struct ast_devstate_aggregate agg;
- enum ast_device_state state;
- struct ast_event *event;
-
- ast_devstate_aggregate_init(&agg);
-
- for (i = 0; i < collection->num_states; i++) {
- ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
- ast_devstate2str(collection->states[i].state), device);
- ast_devstate_aggregate_add(&agg, collection->states[i].state);
- }
-
- state = ast_devstate_aggregate_result(&agg);
-
- ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
- ast_devstate2str(state), device);
-
- event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
- AST_EVENT_IE_END);
-
- if (event) {
- enum ast_device_state old_state;
-
- old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
-
- ast_event_destroy(event);
-
- if (state == old_state) {
+ device = ast_strdupa(device_state->device);
+ ast_debug(1, "Processing device state change for '%s'\n", device);
+
+ if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
+ /* if it's not cachable, there will be no aggregate state to get
+ * and this should be passed through */
+ aggregate_state = device_state->state;
+ } else {
+
+ aggregate_state = get_aggregate_state(device);
+ ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
+ ast_devstate2str(aggregate_state), device);
+
+ if (!aggregate_state_changed(device, aggregate_state)) {
/* No change since last reported device state */
ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
- device, ast_devstate2str(state));
+ device, ast_devstate2str(aggregate_state));
return;
}
}
ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
- device, ast_devstate2str(state));
-
- event = ast_event_new(AST_EVENT_DEVICE_STATE,
- AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
- AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
- AST_EVENT_IE_END);
-
- if (!event) {
- return;
- }
-
- if (cachable) {
- ast_event_queue_and_cache(event);
- } else {
- ast_event_queue(event);
- }
-}
-
-static void handle_devstate_change(struct devstate_change *sc)
-{
- struct ast_event_sub *tmp_sub;
- struct change_collection collection = {
- .num_states = 0,
- };
-
- ast_debug(1, "Processing device state change for '%s'\n", sc->device);
-
- if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) {
- ast_log(LOG_ERROR, "Failed to create subscription\n");
- return;
- }
-
- if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) {
- ast_log(LOG_ERROR, "Failed to append device IE\n");
- ast_event_sub_destroy(tmp_sub);
- return;
- }
-
- /* Populate the collection of device states from the cache */
- ast_event_dump_cache(tmp_sub);
-
- process_collection(sc->device, sc->cachable, &collection);
-
- ast_event_sub_destroy(tmp_sub);
-}
-
-static void *run_devstate_collector(void *data)
-{
- for (;;) {
- struct devstate_change *sc;
-
- ast_mutex_lock(&devstate_collector.lock);
- while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry)))
- ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock);
- ast_mutex_unlock(&devstate_collector.lock);
-
- handle_devstate_change(sc);
-
- destroy_devstate_change(sc);
- }
-
- return NULL;
-}
-
-static void devstate_change_collector_cb(const struct ast_event *event, void *data)
-{
- struct devstate_change *sc;
- const char *device;
- const struct ast_eid *eid;
- uint32_t state;
- enum ast_devstate_cache cachable = AST_DEVSTATE_CACHABLE;
-
- device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
- cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
-
- if (ast_strlen_zero(device) || !eid) {
- ast_log(LOG_ERROR, "Invalid device state change event received\n");
- return;
- }
-
- if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device))))
- return;
-
- strcpy(sc->device, device);
- sc->eid = *eid;
- sc->state = state;
- sc->cachable = cachable;
-
- ast_mutex_lock(&devstate_collector.lock);
- AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
- ast_cond_signal(&devstate_collector.cond);
- ast_mutex_unlock(&devstate_collector.lock);
+ device, ast_devstate2str(aggregate_state));
+
+ ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -784,28 +713,106 @@
return agg->state;
}
-int ast_enable_distributed_devstate(void)
-{
- if (devstate_collector.enabled) {
- return 0;
- }
-
- devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
- devstate_change_collector_cb, "devicestate_engine_enable_distributed", NULL, AST_EVENT_IE_END);
-
- if (!devstate_collector.event_sub) {
+struct stasis_topic *ast_device_state_topic_all(void)
+{
+ return device_state_topic_all;
+}
+
+struct stasis_caching_topic *ast_device_state_topic_cached(void)
+{
+ return device_state_topic_cached;
+}
+
+struct stasis_message_type *ast_device_state_message_type(void)
+{
+ return device_state_message_type;
+}
+
+struct stasis_topic *ast_device_state_topic(const char *device)
+{
+ return stasis_topic_pool_get_topic(device_state_topic_pool, device);
+}
+
+int ast_publish_device_state_full(
+ const char *device,
+ enum ast_device_state state,
+ enum ast_devstate_cache cachable,
+ struct ast_eid *eid)
+{
+ RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+ struct stasis_topic *device_specific_topic;
+
+ ast_assert(!ast_strlen_zero(device));
+
+ device_state = device_state_alloc(device, state, cachable, eid);
+ if (!device_state) {
+ return -1;
+ }
+
+ message = stasis_message_create(ast_device_state_message_type(), device_state);
+
+ device_specific_topic = ast_device_state_topic(device);
+ if (!device_specific_topic) {
+ return -1;
+ }
+
+ stasis_publish(device_specific_topic, message);
+ return 0;
+}
+
+static const char *device_state_get_id(struct stasis_message *message)
+{
+ struct ast_device_state_message *device_state;
+ if (ast_device_state_message_type() != stasis_message_type(message)) {
+ return NULL;
+ }
+
+ device_state = stasis_message_data(message);
+ if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
+ return NULL;
+ }
+
+ return device_state->cache_id;
+}
+
+static void devstate_exit(void)
+{
+ ao2_cleanup(device_state_topic_all);
+ device_state_topic_all = NULL;
+ device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+ ao2_cleanup(device_state_message_type);
+ device_state_message_type = NULL;
+ ao2_cleanup(device_state_topic_pool);
+ device_state_topic_pool = NULL;
+}
+
+int devstate_init(void)
+{
+ device_state_topic_all = stasis_topic_create("ast_device_state_topic");
+ if (!device_state_topic_all) {
+ return -1;
+ }
+ device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id);
+ if (!device_state_topic_cached) {
+ return -1;
+ }
+ device_state_message_type = stasis_message_type_create("ast_device_state_message");
+ if (!device_state_message_type) {
+ return -1;
+ }
+ device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
+ if (!device_state_topic_pool) {
+ return -1;
+ }
+
+ devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL);
+
+ if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
return -1;
}
- ast_mutex_init(&devstate_collector.lock);
- ast_cond_init(&devstate_collector.cond, NULL);
- if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) {
- ast_log(LOG_ERROR, "Unable to start device state collector thread.\n");
- return -1;
- }
-
- devstate_collector.enabled = 1;
-
+ ast_register_atexit(devstate_exit);
return 0;
}
Modified: trunk/main/pbx.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/pbx.c?view=diff&rev=385860&r1=385859&r2=385860
==============================================================================
--- trunk/main/pbx.c (original)
+++ trunk/main/pbx.c Tue Apr 16 10:33:59 2013
@@ -1130,11 +1130,6 @@
char *message;
};
-struct statechange {
- AST_LIST_ENTRY(statechange) entry;
- char dev[0];
-};
-
struct pbx_exception {
AST_DECLARE_STRING_FIELDS(
AST_STRING_FIELD(context); /*!< Context associated with this exception */
@@ -1300,7 +1295,7 @@
static char *overrideswitch = NULL;
/*! \brief Subscription for device state change events */
-static struct ast_event_sub *device_state_sub;
+static struct stasis_subscription *device_state_sub;
/*! \brief Subscription for presence state change events */
static struct ast_event_sub *presence_state_sub;
@@ -5247,32 +5242,40 @@
ao2_iterator_destroy(&iter);
}
-static int handle_statechange(void *datap)
-{
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+{
+ struct ast_device_state_message *dev_state;
struct ast_hint *hint;
struct ast_str *hint_app;
struct ast_hintdevice *device;
struct ast_hintdevice *cmpdevice;
- struct statechange *sc = datap;
struct ao2_iterator *dev_iter;
struct ao2_iterator cb_iter;
char context_name[AST_MAX_CONTEXT];
char exten_name[AST_MAX_EXTENSION];
+ if (ast_device_state_message_type() != stasis_message_type(msg)) {
+ return;
+ }
+
+ dev_state = stasis_message_data(msg);
+ if (dev_state->eid) {
+ /* ignore non-aggregate states */
+ return;
+ }
+
if (ao2_container_count(hintdevices) == 0) {
/* There are no hints monitoring devices. */
- ast_free(sc);
- return 0;
+ return;
}
hint_app = ast_str_create(1024);
if (!hint_app) {
- ast_free(sc);
- return -1;
- }
-
- cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(sc->dev));
- strcpy(cmpdevice->hintdevice, sc->dev);
+ return;
+ }
+
+ cmpdevice = ast_alloca(sizeof(*cmpdevice) + strlen(dev_state->device));
+ strcpy(cmpdevice->hintdevice, dev_state->device);
ast_mutex_lock(&context_merge_lock);/* Hold off ast_merge_contexts_and_delete */
dev_iter = ao2_t_callback(hintdevices,
@@ -5283,8 +5286,7 @@
[... 563 lines stripped ...]
More information about the asterisk-commits
mailing list