[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r408986 - in /team/rmudgett/stasis_cac...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Feb 26 18:13:03 CST 2014
Author: rmudgett
Date: Wed Feb 26 18:12:58 2014
New Revision: 408986
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408986
Log:
device state: Update device state to use the new cache scheme.
* Make device state to use the new cache scheme.
* Reordered devicestate.c init/cleanup to match.
* Added aggregate_post_fn to the cache to post aggregate updates when they
change.
* Reduced redundant code in cache_put().
* Remove some unnecessary RAII_VAR() usage in devicestate.c and
test_devicestate.c.
* Simplified some coding in _ast_device_state().
* Made the device state unit test consumer ao2 object use the ao2 lock
instead of a redundant lock in the struct for ast_cond_wait().
* Made the device state unit test look for the aggregate device state in
the right place. This fixes the unit test.
Modified:
team/rmudgett/stasis_cache/include/asterisk/devicestate.h
team/rmudgett/stasis_cache/include/asterisk/stasis.h
team/rmudgett/stasis_cache/main/devicestate.c
team/rmudgett/stasis_cache/main/stasis_cache.c
team/rmudgett/stasis_cache/tests/test_devicestate.c
Modified: team/rmudgett/stasis_cache/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/devicestate.h?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/devicestate.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/devicestate.h Wed Feb 26 18:12:58 2014
@@ -273,13 +273,11 @@
* \brief The structure that contains device state
* \since 12
*/
+/* BUGBUG the changes to this struct should only be done on trunk. Back them out for v12 version. */
struct ast_device_state_message {
- AST_DECLARE_STRING_FIELDS(
- AST_STRING_FIELD(cache_id); /*!< A unique ID used for hashing */
- AST_STRING_FIELD(device); /*!< The name of the device */
- );
+ const char *device; /*!< The name of the device */
+ struct ast_eid *eid; /*!< The EID of the server where this message originated, NULL EID means aggregate state */
enum ast_device_state state; /*!< The state of the device */
- struct ast_eid *eid; /*!< The EID of the server where this message originated, NULL EID means aggregate state */
enum ast_devstate_cache cachable; /*!< Flag designating the cachability of this device state */
};
Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Wed Feb 26 18:12:58 2014
@@ -311,11 +311,14 @@
*
* \return New message
* \return \c NULL on error
+ *
+ * \since 12.2.0
*/
struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
/*!
* \brief Get the entity id for a \ref stasis_message.
+ * \since 12.2.0
*
* \param msg Message to get eid.
*
@@ -531,8 +534,8 @@
* \brief Create a subscription which forwards all messages from one topic to
* another.
*
- * Note that the \a topic parameter of the invoked callback will the be \a topic
- * the message was sent to, not the topic the subscriber subscribed to.
+ * Note that the \a topic parameter of the invoked callback will the be the
+ * \a topic the message was sent to, not the topic the subscriber subscribed to.
*
* \param from_topic Topic to forward.
* \param to_topic Destination topic of forwarded messages.
@@ -707,6 +710,16 @@
typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
/*!
+ * \brief Callback to post the aggregate cache entry message.
+ * \since 12.2.0
+ *
+ * \param aggregate The aggregate shapshot message to post.
+ *
+ * \return Nothing
+ */
+typedef void (*cache_aggregate_post_fn)(struct stasis_message *aggregate);
+
+/*!
* \brief Get the aggregate cache entry snapshot.
* \since 12.2.0
*
@@ -772,14 +785,15 @@
* The returned object is AO2 managed, so ao2_cleanup() when you're done.
*
* \param id_fn Callback to extract the id from a snapshot message.
- * \param aggregate_fn Callback to calculate the aggregate cache entry.
+ * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
+ * \param aggregate_post_fn Callback to post the aggregate cache entry.
*
* \retval New cache indexed by \a id_fn.
* \retval \c NULL on error
*
* \since 12.2.0
*/
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn);
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn);
/*!
* \brief Create a topic which monitors and caches messages from another topic.
@@ -882,11 +896,14 @@
*
* \retval Message from the cache.
* \retval \c NULL if message is not found.
+ *
+ * \since 12.2.0
*/
struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
/*!
* \brief Retrieve all matching items from the cache.
+ * \since 12.2.0
*
* \param cache The cache to query.
* \param type Type of message to retrieve.
@@ -912,6 +929,7 @@
/*!
* \brief Dump cached items to a subscription for a specific entity.
+ * \since 12.2.0
*
* \param cache The cache to query.
* \param type Type of message to dump (any type if \c NULL).
Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Wed Feb 26 18:12:58 2014
@@ -282,29 +282,30 @@
static enum ast_device_state devstate_cached(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+ struct stasis_message *cached_msg;
struct ast_device_state_message *device_state;
-
- cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
+ enum ast_device_state state;
+
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, NULL);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
device_state = stasis_message_data(cached_msg);
-
- return device_state->state;
+ state = device_state->state;
+ ao2_cleanup(cached_msg);
+
+ return state;
}
/*! \brief Check device state through channel specific function or generic function */
static enum ast_device_state _ast_device_state(const char *device, int check_cache)
{
- char *buf;
char *number;
const struct ast_channel_tech *chan_tech;
enum ast_device_state res;
/*! \brief Channel driver that provides device state */
char *tech;
- /*! \brief Another provider of device state */
- char *provider = NULL;
/* If the last known state is cached, just return that */
if (check_cache) {
@@ -314,16 +315,18 @@
}
}
- buf = ast_strdupa(device);
- tech = strsep(&buf, "/");
- if (!(number = buf)) {
+ number = ast_strdupa(device);
+ tech = strsep(&number, "/");
+ if (!number) {
+ /*! \brief Another provider of device state */
+ char *provider;
+
provider = strsep(&tech, ":");
if (!tech) {
return AST_DEVICE_INVALID;
}
/* We have a provider */
number = tech;
- tech = NULL;
ast_debug(3, "Checking if I can find provider for \"%s\" - number: %s\n", provider, number);
return getproviderstate(provider, number);
@@ -331,18 +334,21 @@
ast_debug(4, "No provider found, checking channel drivers for %s - %s\n", tech, number);
- if (!(chan_tech = ast_get_channel_tech(tech)))
+ chan_tech = ast_get_channel_tech(tech);
+ if (!chan_tech) {
return AST_DEVICE_INVALID;
-
- if (!(chan_tech->devicestate)) /* Does the channel driver support device state notification? */
- return ast_parse_device_state(device); /* No, try the generic function */
+ }
+
+ /* Does the channel driver support device state notification? */
+ if (!chan_tech->devicestate) {
+ /* No, try the generic function */
+ return ast_parse_device_state(device);
+ }
res = chan_tech->devicestate(number);
-
- if (res != AST_DEVICE_UNKNOWN)
- return res;
-
- res = ast_parse_device_state(device);
+ if (res == AST_DEVICE_UNKNOWN) {
+ res = ast_parse_device_state(device);
+ }
return res;
}
@@ -520,106 +526,53 @@
return NULL;
}
-#define MAX_SERVERS 64
-static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
-{
- struct stasis_message *msg = obj;
- struct ast_devstate_aggregate *aggregate = arg;
- char *device = data;
- struct ast_device_state_message *device_state = stasis_message_data(msg);
-
- if (!device_state->eid || strcmp(device, device_state->device)) {
- /* ignore aggregate states and devices that don't match */
- return 0;
- }
- ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
- ast_devstate2str(device_state->state), device);
- ast_devstate_aggregate_add(aggregate, device_state->state);
- return 0;
-}
-
static void device_state_dtor(void *obj)
{
struct ast_device_state_message *device_state = obj;
- ast_string_field_free_memory(device_state);
+
+ ast_free((char *) device_state->device);
ast_free(device_state->eid);
}
static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
{
- RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
-
- if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
+ struct ast_device_state_message *new_device_state;
+
+ ast_assert(!ast_strlen_zero(device));
+
+ new_device_state = ao2_alloc_options(sizeof(*new_device_state), device_state_dtor,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
+ if (!new_device_state) {
return NULL;
}
- ast_string_field_set(new_device_state, device, device);
+ new_device_state->device = ast_strdup(device);
+ if (!new_device_state->device) {
+ ao2_cleanup(new_device_state);
+ return NULL;
+ }
+ if (eid) {
+ /* non-aggregate device state. */
+ new_device_state->eid = ast_malloc(sizeof(*eid));
+ if (!new_device_state->eid) {
+ ao2_cleanup(new_device_state);
+ return NULL;
+ }
+ *new_device_state->eid = *eid;
+ }
new_device_state->state = state;
new_device_state->cachable = cachable;
- if (eid) {
- char eid_str[20];
- struct ast_str *cache_id = ast_str_alloca(256);
-
- new_device_state->eid = ast_malloc(sizeof(*eid));
- if (!new_device_state->eid) {
- return NULL;
- }
-
- *new_device_state->eid = *eid;
- ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
- ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
- ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
- } else {
- /* no EID makes this an aggregate state */
- ast_string_field_set(new_device_state, cache_id, device);
- }
-
- ao2_ref(new_device_state, +1);
return new_device_state;
}
-static enum ast_device_state get_aggregate_state(char *device)
-{
- RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
- struct ast_devstate_aggregate aggregate;
-
- ast_devstate_aggregate_init(&aggregate);
-
- cached = stasis_cache_dump(ast_device_state_cache(), NULL);
-
- ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
-
- return ast_devstate_aggregate_result(&aggregate);
-}
-
-static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
-{
- RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
- struct ast_device_state_message *cached_aggregate_device_state;
-
- cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
- if (!cached_aggregate_msg) {
- return 1;
- }
-
- cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
- if (cached_aggregate_device_state->state == new_aggregate_state) {
- return 0;
- }
- return 1;
-}
-
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
-{
- enum ast_device_state aggregate_state;
- char *device;
+static void devstate_change_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+{
struct ast_device_state_message *device_state;
- RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
- RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
if (stasis_cache_update_type() == stasis_message_type(msg)) {
struct stasis_cache_update *update = stasis_message_data(msg);
+
if (!update->new_snapshot) {
return;
}
@@ -631,37 +584,17 @@
}
device_state = stasis_message_data(msg);
-
- if (!device_state->eid) {
- /* ignore aggregate messages */
+ if (device_state->cachable == AST_DEVSTATE_CACHABLE || !device_state->eid) {
+ /* Ignore cacheable and aggregate messages. */
return;
}
- device = ast_strdupa(device_state->device);
- ast_debug(1, "Processing device state change for '%s'\n", device);
-
- if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
- /* if it's not cachable, there will be no aggregate state to get
- * and this should be passed through */
- aggregate_state = device_state->state;
- } else {
-
- aggregate_state = get_aggregate_state(device);
- ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
- ast_devstate2str(aggregate_state), device);
-
- if (!aggregate_state_changed(device, aggregate_state)) {
- /* No change since last reported device state */
- ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
- device, ast_devstate2str(aggregate_state));
- return;
- }
- }
-
- ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
- device, ast_devstate2str(aggregate_state));
-
- ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
+ /*
+ * Non-cacheable device state aggregates are just the
+ * device state republished as the aggregate.
+ */
+ ast_publish_device_state_full(device_state->device, device_state->state,
+ device_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -736,26 +669,30 @@
int ast_device_state_clear_cache(const char *device)
{
- RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
- if (!(cached_msg = stasis_cache_get(ast_device_state_cache(),
- ast_device_state_message_type(), device))) {
+ struct stasis_message *cached_msg;
+ struct stasis_message *msg;
+
+ cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+ ast_device_state_message_type(), device, &ast_eid_default);
+ if (!cached_msg) {
/* nothing to clear */
return -1;
}
msg = stasis_cache_clear_create(cached_msg);
- stasis_publish(ast_device_state_topic(device), msg);
+ if (msg) {
+ stasis_publish(ast_device_state_topic(device), msg);
+ }
+ ao2_cleanup(msg);
+ ao2_cleanup(cached_msg);
return 0;
}
-/* BUGBUG device_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
int ast_publish_device_state_full(
- const char *device,
- enum ast_device_state state,
- enum ast_devstate_cache cachable,
- struct ast_eid *eid)
+ const char *device,
+ enum ast_device_state state,
+ enum ast_devstate_cache cachable,
+ struct ast_eid *eid)
{
RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@@ -769,7 +706,7 @@
}
message = stasis_message_create_full(ast_device_state_message_type(), device_state,
- eid ?: &ast_eid_default);
+ eid);
if (!message) {
return -1;
}
@@ -786,6 +723,7 @@
static const char *device_state_get_id(struct stasis_message *message)
{
struct ast_device_state_message *device_state;
+
if (ast_device_state_message_type() != stasis_message_type(message)) {
return NULL;
}
@@ -795,20 +733,123 @@
return NULL;
}
- return device_state->cache_id;
+ return device_state->device;
+}
+
+/*!
+ * \internal
+ * \brief Callback to post the aggregate device state cache entry message.
+ * \since 12.2.0
+ *
+ * \param aggregate The aggregate shapshot message to post.
+ *
+ * \return Nothing
+ */
+static void device_state_aggregate_post(struct stasis_message *aggregate)
+{
+ const char *device;
+ struct stasis_topic *device_specific_topic;
+
+ device = device_state_get_id(aggregate);
+ if (!device) {
+ return;
+ }
+ device_specific_topic = ast_device_state_topic(device);
+ if (!device_specific_topic) {
+ return;
+ }
+
+ stasis_publish(device_specific_topic, aggregate);
+}
+
+/*!
+ * \internal
+ * \brief Callback to calculate the aggregate device state cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+static struct stasis_message *device_state_aggregate_calc(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+ struct stasis_message *aggregate_snapshot;
+ struct stasis_message *snapshot;
+ struct ast_device_state_message *device_state;
+ const char *device = NULL;
+ struct ast_devstate_aggregate aggregate;
+ int idx;
+
+ /* Determine the new aggregate device state. */
+ ast_devstate_aggregate_init(&aggregate);
+ snapshot = stasis_cache_entry_get_internal(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+ for (idx = 0; ; ++idx) {
+ snapshot = stasis_cache_entry_get_external(entry, idx);
+ if (!snapshot) {
+ break;
+ }
+
+ device_state = stasis_message_data(snapshot);
+ device = device_state->device;
+ ast_devstate_aggregate_add(&aggregate, device_state->state);
+ }
+
+ if (!device) {
+ /* There are no device states cached. Delete the aggregate. */
+ return NULL;
+ }
+
+ snapshot = stasis_cache_entry_get_aggregate(entry);
+ if (snapshot) {
+ device_state = stasis_message_data(snapshot);
+ if (device_state->state == ast_devstate_aggregate_result(&aggregate)) {
+ /* Aggregate device state did not change. */
+ return ao2_bump(snapshot);
+ }
+ }
+
+ device_state = device_state_alloc(device, ast_devstate_aggregate_result(&aggregate),
+ AST_DEVSTATE_CACHABLE, NULL);
+ if (!device_state) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+ aggregate_snapshot = stasis_message_create_full(ast_device_state_message_type(),
+ device_state, NULL);
+ ao2_cleanup(device_state);
+ if (!aggregate_snapshot) {
+ /* Bummer. We have to keep the old aggregate snapshot. */
+ return ao2_bump(snapshot);
+ }
+
+ return aggregate_snapshot;
}
static void devstate_cleanup(void)
{
devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
+ ao2_cleanup(device_state_cache);
+ device_state_cache = NULL;
+
+ ao2_cleanup(device_state_topic_pool);
+ device_state_topic_pool = NULL;
+
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
- ao2_cleanup(device_state_cache);
- device_state_cache = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
- ao2_cleanup(device_state_topic_pool);
- device_state_topic_pool = NULL;
}
int devstate_init(void)
@@ -820,25 +861,32 @@
}
device_state_topic_all = stasis_topic_create("ast_device_state_topic");
if (!device_state_topic_all) {
- return -1;
- }
- device_state_cache = stasis_cache_create(device_state_get_id);
- if (!device_state_cache) {
- return -1;
- }
- device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
- if (!device_state_topic_cached) {
+ devstate_cleanup();
return -1;
}
device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
if (!device_state_topic_pool) {
- return -1;
- }
-
- devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL);
-
+ devstate_cleanup();
+ return -1;
+ }
+ device_state_cache = stasis_cache_create_full(device_state_get_id,
+ device_state_aggregate_calc, device_state_aggregate_post);
+ if (!device_state_cache) {
+ devstate_cleanup();
+ return -1;
+ }
+ device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all,
+ device_state_cache);
+ if (!device_state_topic_cached) {
+ devstate_cleanup();
+ return -1;
+ }
+
+ devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(),
+ devstate_change_cb, NULL);
if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+ devstate_cleanup();
return -1;
}
Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Wed Feb 26 18:12:58 2014
@@ -48,7 +48,8 @@
struct stasis_cache {
struct ao2_container *entries;
snapshot_get_id id_fn;
- cache_aggregate_calc_fn aggregate_fn;
+ cache_aggregate_calc_fn aggregate_calc_fn;
+ cache_aggregate_post_fn aggregate_post_fn;
};
/*! \internal */
@@ -277,7 +278,8 @@
cache->entries = NULL;
}
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn)
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
+ cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn)
{
struct stasis_cache *cache;
@@ -295,14 +297,15 @@
}
cache->id_fn = id_fn;
- cache->aggregate_fn = aggregate_fn;
+ cache->aggregate_calc_fn = aggregate_calc_fn;
+ cache->aggregate_post_fn = aggregate_post_fn;
return cache;
}
struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
{
- return stasis_cache_create_full(id_fn, NULL);
+ return stasis_cache_create_full(id_fn, NULL, NULL);
}
struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
@@ -473,8 +476,8 @@
}
/* Update the aggregate snapshot. */
- if (cache->aggregate_fn && cached_entry) {
- snapshots.aggregate_new = cache->aggregate_fn(cached_entry, new_snapshot);
+ if (cache->aggregate_calc_fn && cached_entry) {
+ snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
snapshots.aggregate_old = cached_entry->aggregate;
cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
}
@@ -726,9 +729,11 @@
{
struct stasis_caching_topic *caching_topic_needs_unref;
struct stasis_caching_topic *caching_topic = data;
- const char *id;
- struct stasis_message *update;
- struct cache_put_snapshots snapshots;
+ struct stasis_message *msg;
+ struct stasis_message *msg_put;
+ struct stasis_message_type *msg_type;
+ const struct ast_eid *msg_eid;
+ const char *msg_id;
ast_assert(caching_topic != NULL);
ast_assert(caching_topic->topic != NULL);
@@ -741,58 +746,44 @@
caching_topic_needs_unref = NULL;
}
- /* Handle cache clear event */
- if (stasis_cache_clear_type() == stasis_message_type(message)) {
- struct stasis_message *clear_msg = stasis_message_data(message);
- const char *clear_id = caching_topic->cache->id_fn(clear_msg);
- struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
-
- ast_assert(clear_type != NULL);
-
- if (clear_id) {
- snapshots = cache_put(caching_topic->cache, clear_type, clear_id,
- stasis_message_eid(clear_msg), NULL);
- if (snapshots.old) {
- update = update_create(snapshots.old, NULL);
- if (update) {
- stasis_publish(caching_topic->topic, update);
- }
- ao2_cleanup(update);
- } else {
- ast_log(LOG_ERROR,
- "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
- stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
+ msg_type = stasis_message_type(message);
+ if (stasis_cache_clear_type() == msg_type) {
+ /* Cache clear event. */
+ msg_put = NULL;
+ msg = stasis_message_data(message);
+ msg_type = stasis_message_type(msg);
+ } else {
+ /* Normal cache update event. */
+ msg_put = message;
+ msg = message;
+ }
+ ast_assert(msg_type != NULL);
+
+ msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
+ msg_id = caching_topic->cache->id_fn(msg);
+ if (msg_id && msg_eid) {
+ struct stasis_message *update;
+ struct cache_put_snapshots snapshots;
+
+ /* Update the cache */
+ snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
+ if (snapshots.old || msg_put) {
+ update = update_create(snapshots.old, msg_put);
+ if (update) {
+ stasis_publish(caching_topic->topic, update);
}
-
- if (snapshots.aggregate_old != snapshots.aggregate_new) {
- update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
- if (update) {
- stasis_publish(caching_topic->topic, update);
- }
- ao2_cleanup(update);
+ ao2_cleanup(update);
+ } else {
+ ast_log(LOG_ERROR,
+ "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+ stasis_topic_name(caching_topic->topic),
+ stasis_message_type_name(msg_type), msg_id);
+ }
+
+ if (snapshots.aggregate_old != snapshots.aggregate_new) {
+ if (snapshots.aggregate_new && caching_topic->cache->aggregate_post_fn) {
+ caching_topic->cache->aggregate_post_fn(snapshots.aggregate_new);
}
-
- ao2_cleanup(snapshots.old);
- ao2_cleanup(snapshots.aggregate_old);
- ao2_cleanup(snapshots.aggregate_new);
- }
- ao2_cleanup(caching_topic_needs_unref);
- return;
- }
-
- id = caching_topic->cache->id_fn(message);
- if (id) {
- /* Update the cache */
- snapshots = cache_put(caching_topic->cache, stasis_message_type(message), id,
- stasis_message_eid(message), message);
-
- update = update_create(snapshots.old, message);
- if (update) {
- stasis_publish(caching_topic->topic, update);
- }
- ao2_cleanup(update);
-
- if (snapshots.aggregate_old != snapshots.aggregate_new) {
update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
if (update) {
stasis_publish(caching_topic->topic, update);
@@ -804,6 +795,7 @@
ao2_cleanup(snapshots.aggregate_old);
ao2_cleanup(snapshots.aggregate_new);
}
+
ao2_cleanup(caching_topic_needs_unref);
}
Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Wed Feb 26 18:12:58 2014
@@ -277,45 +277,49 @@
}
struct consumer {
- ast_mutex_t lock;
ast_cond_t out;
int already_out;
+ int sig_on_non_aggregate_state;
+ int event_count;
enum ast_device_state state;
enum ast_device_state aggregate_state;
- int sig_on_non_aggregate_state;
};
-static void consumer_dtor(void *obj) {
+static void consumer_dtor(void *obj)
+{
struct consumer *consumer = obj;
- ast_mutex_destroy(&consumer->lock);
ast_cond_destroy(&consumer->out);
}
-static struct consumer *consumer_create(void) {
- RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+static void consumer_reset(struct consumer *consumer)
+{
+ consumer->already_out = 0;
+ consumer->event_count = 0;
+ consumer->state = AST_DEVICE_TOTAL;
+ consumer->aggregate_state = AST_DEVICE_TOTAL;
+}
+
+static struct consumer *consumer_create(void)
+{
+ struct consumer *consumer;
consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
-
if (!consumer) {
return NULL;
}
- ast_mutex_init(&consumer->lock);
ast_cond_init(&consumer->out, NULL);
- consumer->sig_on_non_aggregate_state = 0;
-
- ao2_ref(consumer, +1);
+ consumer_reset(consumer);
+
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
- RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
struct stasis_cache_update *cache_update = stasis_message_data(message);
struct ast_device_state_message *device_state;
- SCOPED_MUTEX(lock, &consumer->lock);
if (!cache_update->new_snapshot) {
return;
@@ -328,17 +332,22 @@
return;
}
- if (device_state->eid) {
- consumer->state = device_state->state;
- if (consumer->sig_on_non_aggregate_state) {
- consumer->sig_on_non_aggregate_state = 0;
+ {
+ SCOPED_AO2LOCK(lock, consumer);
+
+ ++consumer->event_count;
+ if (device_state->eid) {
+ consumer->state = device_state->state;
+ if (consumer->sig_on_non_aggregate_state) {
+ consumer->sig_on_non_aggregate_state = 0;
+ consumer->already_out = 1;
+ ast_cond_signal(&consumer->out);
+ }
+ } else {
+ consumer->aggregate_state = device_state->state;
consumer->already_out = 1;
ast_cond_signal(&consumer->out);
}
- } else {
- consumer->aggregate_state = device_state->state;
- consumer->already_out = 1;
- ast_cond_signal(&consumer->out);
}
}
@@ -360,57 +369,57 @@
.tv_nsec = start.tv_usec * 1000
};
- SCOPED_MUTEX(lock, &consumer->lock);
-
- if (consumer->already_out) {
- consumer->already_out = 0;
- }
-
- while(1) {
- res = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+ SCOPED_AO2LOCK(lock, consumer);
+
+ while (!consumer->already_out) {
+ res = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
if (!res || res == ETIMEDOUT) {
break;
}
}
- consumer->already_out = 0;
}
static int remove_device_states_cb(void *obj, void *arg, int flags)
{
- RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+ struct stasis_message *msg = obj;
struct ast_device_state_message *device_state = stasis_message_data(msg);
+
if (strcmp(UNIT_TEST_DEVICE_IDENTIFIER, device_state->device)) {
- msg = NULL;
+ /* Not a unit test device */
return 0;
}
msg = stasis_cache_clear_create(msg);
- /* topic guaranteed to have been created by this point */
- stasis_publish(ast_device_state_topic(device_state->device), msg);
+ if (msg) {
+ /* topic guaranteed to have been created by this point */
+ stasis_publish(ast_device_state_topic(device_state->device), msg);
+ }
+ ao2_cleanup(msg);
return 0;
}
static void cache_cleanup(int unused)
{
- RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
+ struct ao2_container *cache_dump;
+
/* remove all device states created during this test */
- cache_dump = stasis_cache_dump(ast_device_state_cache(), NULL);
+ cache_dump = stasis_cache_dump_full(ast_device_state_cache(), NULL, NULL);
if (!cache_dump) {
return;
}
ao2_callback(cache_dump, 0, remove_device_states_cb, NULL);
-}
-
-/* BUGBUG this test currently fails because the device state is storing the information in a different place now. */
+ ao2_cleanup(cache_dump);
+}
+
AST_TEST_DEFINE(device_state_aggregation_test)
{
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, device_msg_router, NULL, stasis_message_router_unsubscribe);
RAII_VAR(struct ast_eid *, foreign_eid, NULL, ast_free);
RAII_VAR(int, cleanup_cache, 0, cache_cleanup);
+ RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
int res;
struct ast_device_state_message *device_state;
- struct stasis_message *msg;
switch (cmd) {
case TEST_INIT:
@@ -448,56 +457,67 @@
/* push local state */
ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE);
+ /* Check cache aggregate state immediately */
+ ao2_cleanup(msg);
+ msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+ device_state = stasis_message_data(msg);
+ ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
+
consumer_wait_for(consumer);
ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state);
ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state);
-
- msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
- device_state = stasis_message_data(msg);
- ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
- ao2_cleanup(msg);
- msg = NULL;
+ ast_test_validate(test, 2 == consumer->event_count);
+ consumer_reset(consumer);
/* push remote state */
/* this will not produce a new aggregate state message since the aggregate state does not change */
consumer->sig_on_non_aggregate_state = 1;
ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid);
+ /* Check cache aggregate state immediately */
+ ao2_cleanup(msg);
+ msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+ device_state = stasis_message_data(msg);
+ ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
+
+ /* Check for expected events. */
consumer_wait_for(consumer);
ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state);
- ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state);
-
- msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
- device_state = stasis_message_data(msg);
- ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
- ao2_cleanup(msg);
- msg = NULL;
+ ast_test_validate(test, AST_DEVICE_TOTAL == consumer->aggregate_state);
+ ast_test_validate(test, 1 == consumer->event_count);
+ consumer_reset(consumer);
/* push remote state different from local state */
ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid);
+ /* Check cache aggregate state immediately */
+ ao2_cleanup(msg);
+ msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+ device_state = stasis_message_data(msg);
+ ast_test_validate(test, AST_DEVICE_INUSE == device_state->state);
+
+ /* Check for expected events. */
consumer_wait_for(consumer);
ast_test_validate(test, AST_DEVICE_INUSE == consumer->state);
ast_test_validate(test, AST_DEVICE_INUSE == consumer->aggregate_state);
-
- msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
- device_state = stasis_message_data(msg);
- ast_test_validate(test, AST_DEVICE_INUSE == device_state->state);
- ao2_cleanup(msg);
- msg = NULL;
+ ast_test_validate(test, 2 == consumer->event_count);
+ consumer_reset(consumer);
/* push local state that will cause aggregated state different from local non-aggregate state */
ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_RINGING, AST_DEVSTATE_CACHABLE);
+ /* Check cache aggregate state immediately */
+ ao2_cleanup(msg);
+ msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+ device_state = stasis_message_data(msg);
+ ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state);
+
+ /* Check for expected events. */
consumer_wait_for(consumer);
ast_test_validate(test, AST_DEVICE_RINGING == consumer->state);
ast_test_validate(test, AST_DEVICE_RINGINUSE == consumer->aggregate_state);
-
- msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
- device_state = stasis_message_data(msg);
- ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state);
- ao2_cleanup(msg);
- msg = NULL;
+ ast_test_validate(test, 2 == consumer->event_count);
+ consumer_reset(consumer);
return AST_TEST_PASS;
}
More information about the asterisk-commits
mailing list