[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r408986 - in /team/rmudgett/stasis_cac...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Feb 26 18:13:03 CST 2014


Author: rmudgett
Date: Wed Feb 26 18:12:58 2014
New Revision: 408986

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408986
Log:
device state: Update device state to use the new cache scheme.

* Make device state to use the new cache scheme.

* Reordered devicestate.c init/cleanup to match.

* Added aggregate_post_fn to the cache to post aggregate updates when they
change.

* Reduced redundant code in cache_put().

* Remove some unnecessary RAII_VAR() usage in devicestate.c and
test_devicestate.c.

* Simplified some coding in _ast_device_state().

* Made the device state unit test consumer ao2 object use the ao2 lock
instead of a redundant lock in the struct for ast_cond_wait().

* Made the device state unit test look for the aggregate device state in
the right place.  This fixes the unit test.

Modified:
    team/rmudgett/stasis_cache/include/asterisk/devicestate.h
    team/rmudgett/stasis_cache/include/asterisk/stasis.h
    team/rmudgett/stasis_cache/main/devicestate.c
    team/rmudgett/stasis_cache/main/stasis_cache.c
    team/rmudgett/stasis_cache/tests/test_devicestate.c

Modified: team/rmudgett/stasis_cache/include/asterisk/devicestate.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/devicestate.h?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/devicestate.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/devicestate.h Wed Feb 26 18:12:58 2014
@@ -273,13 +273,11 @@
  * \brief The structure that contains device state
  * \since 12
  */
+/* BUGBUG the changes to this struct should only be done on trunk.  Back them out for v12 version. */
 struct ast_device_state_message {
-	AST_DECLARE_STRING_FIELDS(
-		AST_STRING_FIELD(cache_id);	/*!< A unique ID used for hashing */
-		AST_STRING_FIELD(device);	/*!< The name of the device */
-	);
+	const char *device;					/*!< The name of the device */
+	struct ast_eid *eid;				/*!< The EID of the server where this message originated, NULL EID means aggregate state */
 	enum ast_device_state state;		/*!< The state of the device */
-	struct ast_eid *eid;			/*!< The EID of the server where this message originated, NULL EID means aggregate state */
 	enum ast_devstate_cache cachable;	/*!< Flag designating the cachability of this device state */
 };
 

Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Wed Feb 26 18:12:58 2014
@@ -311,11 +311,14 @@
  *
  * \return New message
  * \return \c NULL on error
+ *
+ * \since 12.2.0
  */
 struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
 
 /*!
  * \brief Get the entity id for a \ref stasis_message.
+ * \since 12.2.0
  *
  * \param msg Message to get eid.
  *
@@ -531,8 +534,8 @@
  * \brief Create a subscription which forwards all messages from one topic to
  * another.
  *
- * Note that the \a topic parameter of the invoked callback will the be \a topic
- * the message was sent to, not the topic the subscriber subscribed to.
+ * Note that the \a topic parameter of the invoked callback will the be the
+ * \a topic the message was sent to, not the topic the subscriber subscribed to.
  *
  * \param from_topic Topic to forward.
  * \param to_topic Destination topic of forwarded messages.
@@ -707,6 +710,16 @@
 typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
 
 /*!
+ * \brief Callback to post the aggregate cache entry message.
+ * \since 12.2.0
+ *
+ * \param aggregate The aggregate shapshot message to post.
+ *
+ * \return Nothing
+ */
+typedef void (*cache_aggregate_post_fn)(struct stasis_message *aggregate);
+
+/*!
  * \brief Get the aggregate cache entry snapshot.
  * \since 12.2.0
  *
@@ -772,14 +785,15 @@
  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
  *
  * \param id_fn Callback to extract the id from a snapshot message.
- * \param aggregate_fn Callback to calculate the aggregate cache entry.
+ * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
+ * \param aggregate_post_fn Callback to post the aggregate cache entry.
  *
  * \retval New cache indexed by \a id_fn.
  * \retval \c NULL on error
  *
  * \since 12.2.0
  */
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn);
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn);
 
 /*!
  * \brief Create a topic which monitors and caches messages from another topic.
@@ -882,11 +896,14 @@
  *
  * \retval Message from the cache.
  * \retval \c NULL if message is not found.
+ *
+ * \since 12.2.0
  */
 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
 
 /*!
  * \brief Retrieve all matching items from the cache.
+ * \since 12.2.0
  *
  * \param cache The cache to query.
  * \param type Type of message to retrieve.
@@ -912,6 +929,7 @@
 
 /*!
  * \brief Dump cached items to a subscription for a specific entity.
+ * \since 12.2.0
  *
  * \param cache The cache to query.
  * \param type Type of message to dump (any type if \c NULL).

Modified: team/rmudgett/stasis_cache/main/devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/devicestate.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/main/devicestate.c (original)
+++ team/rmudgett/stasis_cache/main/devicestate.c Wed Feb 26 18:12:58 2014
@@ -282,29 +282,30 @@
 
 static enum ast_device_state devstate_cached(const char *device)
 {
-	RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
+	struct stasis_message *cached_msg;
 	struct ast_device_state_message *device_state;
-
-	cached_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
+	enum ast_device_state state;
+
+	cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+		ast_device_state_message_type(), device, NULL);
 	if (!cached_msg) {
 		return AST_DEVICE_UNKNOWN;
 	}
 	device_state = stasis_message_data(cached_msg);
-
-	return device_state->state;
+	state = device_state->state;
+	ao2_cleanup(cached_msg);
+
+	return state;
 }
 
 /*! \brief Check device state through channel specific function or generic function */
 static enum ast_device_state _ast_device_state(const char *device, int check_cache)
 {
-	char *buf;
 	char *number;
 	const struct ast_channel_tech *chan_tech;
 	enum ast_device_state res;
 	/*! \brief Channel driver that provides device state */
 	char *tech;
-	/*! \brief Another provider of device state */
-	char *provider = NULL;
 
 	/* If the last known state is cached, just return that */
 	if (check_cache) {
@@ -314,16 +315,18 @@
 		}
 	}
 
-	buf = ast_strdupa(device);
-	tech = strsep(&buf, "/");
-	if (!(number = buf)) {
+	number = ast_strdupa(device);
+	tech = strsep(&number, "/");
+	if (!number) {
+		/*! \brief Another provider of device state */
+		char *provider;
+
 		provider = strsep(&tech, ":");
 		if (!tech) {
 			return AST_DEVICE_INVALID;
 		}
 		/* We have a provider */
 		number = tech;
-		tech = NULL;
 
 		ast_debug(3, "Checking if I can find provider for \"%s\" - number: %s\n", provider, number);
 		return getproviderstate(provider, number);
@@ -331,18 +334,21 @@
 
 	ast_debug(4, "No provider found, checking channel drivers for %s - %s\n", tech, number);
 
-	if (!(chan_tech = ast_get_channel_tech(tech)))
+	chan_tech = ast_get_channel_tech(tech);
+	if (!chan_tech) {
 		return AST_DEVICE_INVALID;
-
-	if (!(chan_tech->devicestate)) /* Does the channel driver support device state notification? */
-		return ast_parse_device_state(device); /* No, try the generic function */
+	}
+
+	/* Does the channel driver support device state notification? */
+	if (!chan_tech->devicestate) {
+		/* No, try the generic function */
+		return ast_parse_device_state(device);
+	}
 
 	res = chan_tech->devicestate(number);
-
-	if (res != AST_DEVICE_UNKNOWN)
-		return res;
-
-	res = ast_parse_device_state(device);
+	if (res == AST_DEVICE_UNKNOWN) {
+		res = ast_parse_device_state(device);
+	}
 
 	return res;
 }
@@ -520,106 +526,53 @@
 	return NULL;
 }
 
-#define MAX_SERVERS 64
-static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
-{
-	struct stasis_message *msg = obj;
-	struct ast_devstate_aggregate *aggregate = arg;
-	char *device = data;
-	struct ast_device_state_message *device_state = stasis_message_data(msg);
-
-	if (!device_state->eid || strcmp(device, device_state->device)) {
-		/* ignore aggregate states and devices that don't match */
-		return 0;
-	}
-	ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
-		ast_devstate2str(device_state->state), device);
-	ast_devstate_aggregate_add(aggregate, device_state->state);
-	return 0;
-}
-
 static void device_state_dtor(void *obj)
 {
 	struct ast_device_state_message *device_state = obj;
-	ast_string_field_free_memory(device_state);
+
+	ast_free((char *) device_state->device);
 	ast_free(device_state->eid);
 }
 
 static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
 {
-	RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
-
-	if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
+	struct ast_device_state_message *new_device_state;
+
+	ast_assert(!ast_strlen_zero(device));
+
+	new_device_state = ao2_alloc_options(sizeof(*new_device_state), device_state_dtor,
+		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!new_device_state) {
 		return NULL;
 	}
 
-	ast_string_field_set(new_device_state, device, device);
+	new_device_state->device = ast_strdup(device);
+	if (!new_device_state->device) {
+		ao2_cleanup(new_device_state);
+		return NULL;
+	}
+	if (eid) {
+		/* non-aggregate device state. */
+		new_device_state->eid = ast_malloc(sizeof(*eid));
+		if (!new_device_state->eid) {
+			ao2_cleanup(new_device_state);
+			return NULL;
+		}
+		*new_device_state->eid = *eid;
+	}
 	new_device_state->state = state;
 	new_device_state->cachable = cachable;
 
-	if (eid) {
-		char eid_str[20];
-		struct ast_str *cache_id = ast_str_alloca(256);
-
-		new_device_state->eid = ast_malloc(sizeof(*eid));
-		if (!new_device_state->eid) {
-			return NULL;
-		}
-
-		*new_device_state->eid = *eid;
-		ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
-		ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
-		ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
-	} else {
-		/* no EID makes this an aggregate state */
-		ast_string_field_set(new_device_state, cache_id, device);
-	}
-
-	ao2_ref(new_device_state, +1);
 	return new_device_state;
 }
 
-static enum ast_device_state get_aggregate_state(char *device)
-{
-	RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
-	struct ast_devstate_aggregate aggregate;
-
-	ast_devstate_aggregate_init(&aggregate);
-
-	cached = stasis_cache_dump(ast_device_state_cache(), NULL);
-
-	ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
-
-	return ast_devstate_aggregate_result(&aggregate);
-}
-
-static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
-{
-	RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
-	struct ast_device_state_message *cached_aggregate_device_state;
-
-	cached_aggregate_msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), device);
-	if (!cached_aggregate_msg) {
-		return 1;
-	}
-
-	cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
-	if (cached_aggregate_device_state->state == new_aggregate_state) {
-		return 0;
-	}
-	return 1;
-}
-
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
-{
-	enum ast_device_state aggregate_state;
-	char *device;
+static void devstate_change_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
+{
 	struct ast_device_state_message *device_state;
-	RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
-	RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
 
 	if (stasis_cache_update_type() == stasis_message_type(msg)) {
 		struct stasis_cache_update *update = stasis_message_data(msg);
+
 		if (!update->new_snapshot) {
 			return;
 		}
@@ -631,37 +584,17 @@
 	}
 
 	device_state = stasis_message_data(msg);
-
-	if (!device_state->eid) {
-		/* ignore aggregate messages */
+	if (device_state->cachable == AST_DEVSTATE_CACHABLE || !device_state->eid) {
+		/* Ignore cacheable and aggregate messages. */
 		return;
 	}
 
-	device = ast_strdupa(device_state->device);
-	ast_debug(1, "Processing device state change for '%s'\n", device);
-
-	if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
-		/* if it's not cachable, there will be no aggregate state to get
-		 * and this should be passed through */
-		aggregate_state = device_state->state;
-	} else {
-
-		aggregate_state = get_aggregate_state(device);
-		ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
-			ast_devstate2str(aggregate_state), device);
-
-		if (!aggregate_state_changed(device, aggregate_state)) {
-			/* No change since last reported device state */
-			ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
-				device, ast_devstate2str(aggregate_state));
-			return;
-		}
-	}
-
-	ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
-		device, ast_devstate2str(aggregate_state));
-
-	ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
+	/*
+	 * Non-cacheable device state aggregates are just the
+	 * device state republished as the aggregate.
+	 */
+	ast_publish_device_state_full(device_state->device, device_state->state,
+		device_state->cachable, NULL);
 }
 
 /*! \brief Initialize the device state engine in separate thread */
@@ -736,26 +669,30 @@
 
 int ast_device_state_clear_cache(const char *device)
 {
-	RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-
-	if (!(cached_msg = stasis_cache_get(ast_device_state_cache(),
-					    ast_device_state_message_type(), device))) {
+	struct stasis_message *cached_msg;
+	struct stasis_message *msg;
+
+	cached_msg = stasis_cache_get_by_eid(ast_device_state_cache(),
+		ast_device_state_message_type(), device, &ast_eid_default);
+	if (!cached_msg) {
 		/* nothing to clear */
 		return -1;
 	}
 
 	msg = stasis_cache_clear_create(cached_msg);
-	stasis_publish(ast_device_state_topic(device), msg);
+	if (msg) {
+		stasis_publish(ast_device_state_topic(device), msg);
+	}
+	ao2_cleanup(msg);
+	ao2_cleanup(cached_msg);
 	return 0;
 }
 
-/* BUGBUG device_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
 int ast_publish_device_state_full(
-			const char *device,
-			enum ast_device_state state,
-			enum ast_devstate_cache cachable,
-			struct ast_eid *eid)
+	const char *device,
+	enum ast_device_state state,
+	enum ast_devstate_cache cachable,
+	struct ast_eid *eid)
 {
 	RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
@@ -769,7 +706,7 @@
 	}
 
 	message = stasis_message_create_full(ast_device_state_message_type(), device_state,
-		eid ?: &ast_eid_default);
+		eid);
 	if (!message) {
 		return -1;
 	}
@@ -786,6 +723,7 @@
 static const char *device_state_get_id(struct stasis_message *message)
 {
 	struct ast_device_state_message *device_state;
+
 	if (ast_device_state_message_type() != stasis_message_type(message)) {
 		return NULL;
 	}
@@ -795,20 +733,123 @@
 		return NULL;
 	}
 
-	return device_state->cache_id;
+	return device_state->device;
+}
+
+/*!
+ * \internal
+ * \brief Callback to post the aggregate device state cache entry message.
+ * \since 12.2.0
+ *
+ * \param aggregate The aggregate shapshot message to post.
+ *
+ * \return Nothing
+ */
+static void device_state_aggregate_post(struct stasis_message *aggregate)
+{
+	const char *device;
+	struct stasis_topic *device_specific_topic;
+
+	device = device_state_get_id(aggregate);
+	if (!device) {
+		return;
+	}
+	device_specific_topic = ast_device_state_topic(device);
+	if (!device_specific_topic) {
+		return;
+	}
+
+	stasis_publish(device_specific_topic, aggregate);
+}
+
+/*!
+ * \internal
+ * \brief Callback to calculate the aggregate device state cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+static struct stasis_message *device_state_aggregate_calc(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
+{
+	struct stasis_message *aggregate_snapshot;
+	struct stasis_message *snapshot;
+	struct ast_device_state_message *device_state;
+	const char *device = NULL;
+	struct ast_devstate_aggregate aggregate;
+	int idx;
+
+	/* Determine the new aggregate device state. */
+	ast_devstate_aggregate_init(&aggregate);
+	snapshot = stasis_cache_entry_get_internal(entry);
+	if (snapshot) {
+		device_state = stasis_message_data(snapshot);
+		device = device_state->device;
+		ast_devstate_aggregate_add(&aggregate, device_state->state);
+	}
+	for (idx = 0; ; ++idx) {
+		snapshot = stasis_cache_entry_get_external(entry, idx);
+		if (!snapshot) {
+			break;
+		}
+
+		device_state = stasis_message_data(snapshot);
+		device = device_state->device;
+		ast_devstate_aggregate_add(&aggregate, device_state->state);
+	}
+
+	if (!device) {
+		/* There are no device states cached.  Delete the aggregate. */
+		return NULL;
+	}
+
+	snapshot = stasis_cache_entry_get_aggregate(entry);
+	if (snapshot) {
+		device_state = stasis_message_data(snapshot);
+		if (device_state->state == ast_devstate_aggregate_result(&aggregate)) {
+			/* Aggregate device state did not change. */
+			return ao2_bump(snapshot);
+		}
+	}
+
+	device_state = device_state_alloc(device, ast_devstate_aggregate_result(&aggregate),
+		AST_DEVSTATE_CACHABLE, NULL);
+	if (!device_state) {
+		/* Bummer.  We have to keep the old aggregate snapshot. */
+		return ao2_bump(snapshot);
+	}
+	aggregate_snapshot = stasis_message_create_full(ast_device_state_message_type(),
+		device_state, NULL);
+	ao2_cleanup(device_state);
+	if (!aggregate_snapshot) {
+		/* Bummer.  We have to keep the old aggregate snapshot. */
+		return ao2_bump(snapshot);
+	}
+
+	return aggregate_snapshot;
 }
 
 static void devstate_cleanup(void)
 {
 	devstate_message_sub = stasis_unsubscribe_and_join(devstate_message_sub);
+	device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
+	ao2_cleanup(device_state_cache);
+	device_state_cache = NULL;
+
+	ao2_cleanup(device_state_topic_pool);
+	device_state_topic_pool = NULL;
+
 	ao2_cleanup(device_state_topic_all);
 	device_state_topic_all = NULL;
-	ao2_cleanup(device_state_cache);
-	device_state_cache = NULL;
-	device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
+
 	STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
-	ao2_cleanup(device_state_topic_pool);
-	device_state_topic_pool = NULL;
 }
 
 int devstate_init(void)
@@ -820,25 +861,32 @@
 	}
 	device_state_topic_all = stasis_topic_create("ast_device_state_topic");
 	if (!device_state_topic_all) {
-		return -1;
-	}
-	device_state_cache = stasis_cache_create(device_state_get_id);
-	if (!device_state_cache) {
-		return -1;
-	}
-	device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_cache);
-	if (!device_state_topic_cached) {
+		devstate_cleanup();
 		return -1;
 	}
 	device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
 	if (!device_state_topic_pool) {
-		return -1;
-	}
-
-	devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(), devstate_change_collector_cb, NULL);
-
+		devstate_cleanup();
+		return -1;
+	}
+	device_state_cache = stasis_cache_create_full(device_state_get_id,
+		device_state_aggregate_calc, device_state_aggregate_post);
+	if (!device_state_cache) {
+		devstate_cleanup();
+		return -1;
+	}
+	device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all,
+		device_state_cache);
+	if (!device_state_topic_cached) {
+		devstate_cleanup();
+		return -1;
+	}
+
+	devstate_message_sub = stasis_subscribe(ast_device_state_topic_cached(),
+		devstate_change_cb, NULL);
 	if (!devstate_message_sub) {
 		ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
+		devstate_cleanup();
 		return -1;
 	}
 

Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Wed Feb 26 18:12:58 2014
@@ -48,7 +48,8 @@
 struct stasis_cache {
 	struct ao2_container *entries;
 	snapshot_get_id id_fn;
-	cache_aggregate_calc_fn aggregate_fn;
+	cache_aggregate_calc_fn aggregate_calc_fn;
+	cache_aggregate_post_fn aggregate_post_fn;
 };
 
 /*! \internal */
@@ -277,7 +278,8 @@
 	cache->entries = NULL;
 }
 
-struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn)
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
+	cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_post_fn aggregate_post_fn)
 {
 	struct stasis_cache *cache;
 
@@ -295,14 +297,15 @@
 	}
 
 	cache->id_fn = id_fn;
-	cache->aggregate_fn = aggregate_fn;
+	cache->aggregate_calc_fn = aggregate_calc_fn;
+	cache->aggregate_post_fn = aggregate_post_fn;
 
 	return cache;
 }
 
 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
 {
-	return stasis_cache_create_full(id_fn, NULL);
+	return stasis_cache_create_full(id_fn, NULL, NULL);
 }
 
 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
@@ -473,8 +476,8 @@
 	}
 
 	/* Update the aggregate snapshot. */
-	if (cache->aggregate_fn && cached_entry) {
-		snapshots.aggregate_new = cache->aggregate_fn(cached_entry, new_snapshot);
+	if (cache->aggregate_calc_fn && cached_entry) {
+		snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
 		snapshots.aggregate_old = cached_entry->aggregate;
 		cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
 	}
@@ -726,9 +729,11 @@
 {
 	struct stasis_caching_topic *caching_topic_needs_unref;
 	struct stasis_caching_topic *caching_topic = data;
-	const char *id;
-	struct stasis_message *update;
-	struct cache_put_snapshots snapshots;
+	struct stasis_message *msg;
+	struct stasis_message *msg_put;
+	struct stasis_message_type *msg_type;
+	const struct ast_eid *msg_eid;
+	const char *msg_id;
 
 	ast_assert(caching_topic != NULL);
 	ast_assert(caching_topic->topic != NULL);
@@ -741,58 +746,44 @@
 		caching_topic_needs_unref = NULL;
 	}
 
-	/* Handle cache clear event */
-	if (stasis_cache_clear_type() == stasis_message_type(message)) {
-		struct stasis_message *clear_msg = stasis_message_data(message);
-		const char *clear_id = caching_topic->cache->id_fn(clear_msg);
-		struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
-
-		ast_assert(clear_type != NULL);
-
-		if (clear_id) {
-			snapshots = cache_put(caching_topic->cache, clear_type, clear_id,
-				stasis_message_eid(clear_msg), NULL);
-			if (snapshots.old) {
-				update = update_create(snapshots.old, NULL);
-				if (update) {
-					stasis_publish(caching_topic->topic, update);
-				}
-				ao2_cleanup(update);
-			} else {
-				ast_log(LOG_ERROR,
-					"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
-					stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
+	msg_type = stasis_message_type(message);
+	if (stasis_cache_clear_type() == msg_type) {
+		/* Cache clear event. */
+		msg_put = NULL;
+		msg = stasis_message_data(message);
+		msg_type = stasis_message_type(msg);
+	} else {
+		/* Normal cache update event. */
+		msg_put = message;
+		msg = message;
+	}
+	ast_assert(msg_type != NULL);
+
+	msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
+	msg_id = caching_topic->cache->id_fn(msg);
+	if (msg_id && msg_eid) {
+		struct stasis_message *update;
+		struct cache_put_snapshots snapshots;
+
+		/* Update the cache */
+		snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
+		if (snapshots.old || msg_put) {
+			update = update_create(snapshots.old, msg_put);
+			if (update) {
+				stasis_publish(caching_topic->topic, update);
 			}
-
-			if (snapshots.aggregate_old != snapshots.aggregate_new) {
-				update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
-				if (update) {
-					stasis_publish(caching_topic->topic, update);
-				}
-				ao2_cleanup(update);
+			ao2_cleanup(update);
+		} else {
+			ast_log(LOG_ERROR,
+				"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+				stasis_topic_name(caching_topic->topic),
+				stasis_message_type_name(msg_type), msg_id);
+		}
+
+		if (snapshots.aggregate_old != snapshots.aggregate_new) {
+			if (snapshots.aggregate_new && caching_topic->cache->aggregate_post_fn) {
+				caching_topic->cache->aggregate_post_fn(snapshots.aggregate_new);
 			}
-
-			ao2_cleanup(snapshots.old);
-			ao2_cleanup(snapshots.aggregate_old);
-			ao2_cleanup(snapshots.aggregate_new);
-		}
-		ao2_cleanup(caching_topic_needs_unref);
-		return;
-	}
-
-	id = caching_topic->cache->id_fn(message);
-	if (id) {
-		/* Update the cache */
-		snapshots = cache_put(caching_topic->cache, stasis_message_type(message), id,
-			stasis_message_eid(message), message);
-
-		update = update_create(snapshots.old, message);
-		if (update) {
-			stasis_publish(caching_topic->topic, update);
-		}
-		ao2_cleanup(update);
-
-		if (snapshots.aggregate_old != snapshots.aggregate_new) {
 			update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
 			if (update) {
 				stasis_publish(caching_topic->topic, update);
@@ -804,6 +795,7 @@
 		ao2_cleanup(snapshots.aggregate_old);
 		ao2_cleanup(snapshots.aggregate_new);
 	}
+
 	ao2_cleanup(caching_topic_needs_unref);
 }
 

Modified: team/rmudgett/stasis_cache/tests/test_devicestate.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/tests/test_devicestate.c?view=diff&rev=408986&r1=408985&r2=408986
==============================================================================
--- team/rmudgett/stasis_cache/tests/test_devicestate.c (original)
+++ team/rmudgett/stasis_cache/tests/test_devicestate.c Wed Feb 26 18:12:58 2014
@@ -277,45 +277,49 @@
 }
 
 struct consumer {
-	ast_mutex_t lock;
 	ast_cond_t out;
 	int already_out;
+	int sig_on_non_aggregate_state;
+	int event_count;
 	enum ast_device_state state;
 	enum ast_device_state aggregate_state;
-	int sig_on_non_aggregate_state;
 };
 
-static void consumer_dtor(void *obj) {
+static void consumer_dtor(void *obj)
+{
 	struct consumer *consumer = obj;
 
-	ast_mutex_destroy(&consumer->lock);
 	ast_cond_destroy(&consumer->out);
 }
 
-static struct consumer *consumer_create(void) {
-	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+static void consumer_reset(struct consumer *consumer)
+{
+	consumer->already_out = 0;
+	consumer->event_count = 0;
+	consumer->state = AST_DEVICE_TOTAL;
+	consumer->aggregate_state = AST_DEVICE_TOTAL;
+}
+
+static struct consumer *consumer_create(void)
+{
+	struct consumer *consumer;
 
 	consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
-
 	if (!consumer) {
 		return NULL;
 	}
 
-	ast_mutex_init(&consumer->lock);
 	ast_cond_init(&consumer->out, NULL);
-	consumer->sig_on_non_aggregate_state = 0;
-
-	ao2_ref(consumer, +1);
+	consumer_reset(consumer);
+
 	return consumer;
 }
 
 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
 	struct consumer *consumer = data;
-	RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
 	struct stasis_cache_update *cache_update = stasis_message_data(message);
 	struct ast_device_state_message *device_state;
-	SCOPED_MUTEX(lock, &consumer->lock);
 
 	if (!cache_update->new_snapshot) {
 		return;
@@ -328,17 +332,22 @@
 		return;
 	}
 
-	if (device_state->eid) {
-		consumer->state = device_state->state;
-		if (consumer->sig_on_non_aggregate_state) {
-			consumer->sig_on_non_aggregate_state = 0;
+	{
+		SCOPED_AO2LOCK(lock, consumer);
+
+		++consumer->event_count;
+		if (device_state->eid) {
+			consumer->state = device_state->state;
+			if (consumer->sig_on_non_aggregate_state) {
+				consumer->sig_on_non_aggregate_state = 0;
+				consumer->already_out = 1;
+				ast_cond_signal(&consumer->out);
+			}
+		} else {
+			consumer->aggregate_state = device_state->state;
 			consumer->already_out = 1;
 			ast_cond_signal(&consumer->out);
 		}
-	} else {
-		consumer->aggregate_state = device_state->state;
-		consumer->already_out = 1;
-		ast_cond_signal(&consumer->out);
 	}
 }
 
@@ -360,57 +369,57 @@
 		.tv_nsec = start.tv_usec * 1000
 	};
 
-	SCOPED_MUTEX(lock, &consumer->lock);
-
-	if (consumer->already_out) {
-		consumer->already_out = 0;
-	}
-
-	while(1) {
-		res = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
+	SCOPED_AO2LOCK(lock, consumer);
+
+	while (!consumer->already_out) {
+		res = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
 		if (!res || res == ETIMEDOUT) {
 			break;
 		}
 	}
-	consumer->already_out = 0;
 }
 
 static int remove_device_states_cb(void *obj, void *arg, int flags)
 {
-	RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup);
+	struct stasis_message *msg = obj;
 	struct ast_device_state_message *device_state = stasis_message_data(msg);
+
 	if (strcmp(UNIT_TEST_DEVICE_IDENTIFIER, device_state->device)) {
-		msg = NULL;
+		/* Not a unit test device */
 		return 0;
 	}
 
 	msg = stasis_cache_clear_create(msg);
-	/* topic guaranteed to have been created by this point */
-	stasis_publish(ast_device_state_topic(device_state->device), msg);
+	if (msg) {
+		/* topic guaranteed to have been created by this point */
+		stasis_publish(ast_device_state_topic(device_state->device), msg);
+	}
+	ao2_cleanup(msg);
 	return 0;
 }
 
 static void cache_cleanup(int unused)
 {
-	RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
+	struct ao2_container *cache_dump;
+
 	/* remove all device states created during this test */
-	cache_dump = stasis_cache_dump(ast_device_state_cache(), NULL);
+	cache_dump = stasis_cache_dump_full(ast_device_state_cache(), NULL, NULL);
 	if (!cache_dump) {
 		return;
 	}
 	ao2_callback(cache_dump, 0, remove_device_states_cb, NULL);
-}
-
-/* BUGBUG this test currently fails because the device state is storing the information in a different place now. */
+	ao2_cleanup(cache_dump);
+}
+
 AST_TEST_DEFINE(device_state_aggregation_test)
 {
 	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
 	RAII_VAR(struct stasis_message_router *, device_msg_router, NULL, stasis_message_router_unsubscribe);
 	RAII_VAR(struct ast_eid *, foreign_eid, NULL, ast_free);
 	RAII_VAR(int, cleanup_cache, 0, cache_cleanup);
+	RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 	int res;
 	struct ast_device_state_message *device_state;
-	struct stasis_message *msg;
 
 	switch (cmd) {
 	case TEST_INIT:
@@ -448,56 +457,67 @@
 	/* push local state */
 	ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE);
 
+	/* Check cache aggregate state immediately */
+	ao2_cleanup(msg);
+	msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+	device_state = stasis_message_data(msg);
+	ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
+
 	consumer_wait_for(consumer);
 	ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state);
 	ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state);
-
-	msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
-	device_state = stasis_message_data(msg);
-	ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
-	ao2_cleanup(msg);
-	msg = NULL;
+	ast_test_validate(test, 2 == consumer->event_count);
+	consumer_reset(consumer);
 
 	/* push remote state */
 	/* this will not produce a new aggregate state message since the aggregate state does not change */
 	consumer->sig_on_non_aggregate_state = 1;
 	ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_NOT_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid);
 
+	/* Check cache aggregate state immediately */
+	ao2_cleanup(msg);
+	msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+	device_state = stasis_message_data(msg);
+	ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
+
+	/* Check for expected events. */
 	consumer_wait_for(consumer);
 	ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state);
-	ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state);
-
-	msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
-	device_state = stasis_message_data(msg);
-	ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state);
-	ao2_cleanup(msg);
-	msg = NULL;
+	ast_test_validate(test, AST_DEVICE_TOTAL == consumer->aggregate_state);
+	ast_test_validate(test, 1 == consumer->event_count);
+	consumer_reset(consumer);
 
 	/* push remote state different from local state */
 	ast_publish_device_state_full(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_INUSE, AST_DEVSTATE_CACHABLE, foreign_eid);
 
+	/* Check cache aggregate state immediately */
+	ao2_cleanup(msg);
+	msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+	device_state = stasis_message_data(msg);
+	ast_test_validate(test, AST_DEVICE_INUSE == device_state->state);
+
+	/* Check for expected events. */
 	consumer_wait_for(consumer);
 	ast_test_validate(test, AST_DEVICE_INUSE == consumer->state);
 	ast_test_validate(test, AST_DEVICE_INUSE == consumer->aggregate_state);
-
-	msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
-	device_state = stasis_message_data(msg);
-	ast_test_validate(test, AST_DEVICE_INUSE == device_state->state);
-	ao2_cleanup(msg);
-	msg = NULL;
+	ast_test_validate(test, 2 == consumer->event_count);
+	consumer_reset(consumer);
 
 	/* push local state that will cause aggregated state different from local non-aggregate state */
 	ast_publish_device_state(UNIT_TEST_DEVICE_IDENTIFIER, AST_DEVICE_RINGING, AST_DEVSTATE_CACHABLE);
 
+	/* Check cache aggregate state immediately */
+	ao2_cleanup(msg);
+	msg = stasis_cache_get_by_eid(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER, NULL);
+	device_state = stasis_message_data(msg);
+	ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state);
+
+	/* Check for expected events. */
 	consumer_wait_for(consumer);
 	ast_test_validate(test, AST_DEVICE_RINGING == consumer->state);
 	ast_test_validate(test, AST_DEVICE_RINGINUSE == consumer->aggregate_state);
-
-	msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER);
-	device_state = stasis_message_data(msg);
-	ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state);
-	ao2_cleanup(msg);
-	msg = NULL;
+	ast_test_validate(test, 2 == consumer->event_count);
+	consumer_reset(consumer);
 
 	return AST_TEST_PASS;
 }




More information about the asterisk-commits mailing list