[asterisk-commits] rmudgett: branch rmudgett/stasis_cache r408874 - in /team/rmudgett/stasis_cac...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Feb 24 18:15:19 CST 2014


Author: rmudgett
Date: Mon Feb 24 18:15:17 2014
New Revision: 408874

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=408874
Log:
stasis: Add support for aggregate cache and messages.

Modified:
    team/rmudgett/stasis_cache/include/asterisk/stasis.h
    team/rmudgett/stasis_cache/main/app.c
    team/rmudgett/stasis_cache/main/stasis_cache.c
    team/rmudgett/stasis_cache/main/stasis_message.c

Modified: team/rmudgett/stasis_cache/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/include/asterisk/stasis.h?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/include/asterisk/stasis.h (original)
+++ team/rmudgett/stasis_cache/include/asterisk/stasis.h Mon Feb 24 18:15:17 2014
@@ -307,7 +307,7 @@
  *
  * \param type Type of the message
  * \param data Immutable data that is the actual contents of the message
- * \param eid What entity originated this message.
+ * \param eid What entity originated this message. (NULL for aggregate)
  *
  * \return New message
  * \return \c NULL on error
@@ -319,8 +319,8 @@
  *
  * \param msg Message to get eid.
  *
- * \return Entity id of \a msg
- * \return \c NULL if \a msg is \c NULL.
+ * \retval Entity id of \a msg
+ * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
  */
 const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
 
@@ -668,6 +668,9 @@
  */
 struct stasis_cache;
 
+/*! Cache entry used for calculating the aggregate snapshot. */
+struct stasis_cache_entry;
+
 /*!
  * \brief A topic wrapper, which caches certain messages.
  * \since 12
@@ -689,6 +692,61 @@
 typedef const char *(*snapshot_get_id)(struct stasis_message *message);
 
 /*!
+ * \brief Callback to calculate the aggregate cache entry.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to calculate a new aggregate snapshot.
+ * \param new_snapshot The shapshot that is being updated.
+ *
+ * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
+ * if a new aggregate could not be calculated because of error.
+ *
+ * \return New aggregate-snapshot calculated on success.
+ * Caller has a reference on return.
+ */
+typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
+
+/*!
+ * \brief Get the aggregate cache entry snapshot.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the aggregate snapshot.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval Aggregate-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get the internal cache entry snapshot.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the internal snapshot.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval Internal-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry);
+
+/*!
+ * \brief Get the external cache entry snapshot by index.
+ * \since 12.2.0
+ *
+ * \param entry Cache entry to get the external snapshot.
+ * \param idx Which external snapshot to get.
+ *
+ * \note A reference is not given to the returned pointer so don't unref it.
+ *
+ * \retval External-snapshot in cache.
+ * \retval NULL if not present.
+ */
+struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx);
+
+/*!
  * \brief Create a cache.
  *
  * This is the backend store for a \ref stasis_caching_topic. The cache is
@@ -697,11 +755,31 @@
  * The returned object is AO2 managed, so ao2_cleanup() when you're done.
  *
  * \param id_fn Callback to extract the id from a snapshot message.
- * \return New cache indexed by \a id_fn.
- * \return \c NULL on error
+ *
+ * \retval New cache indexed by \a id_fn.
+ * \retval \c NULL on error
+ *
  * \since 12
  */
 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
+
+/*!
+ * \brief Create a cache.
+ *
+ * This is the backend store for a \ref stasis_caching_topic. The cache is
+ * thread safe, allowing concurrent reads and writes.
+ *
+ * The returned object is AO2 managed, so ao2_cleanup() when you're done.
+ *
+ * \param id_fn Callback to extract the id from a snapshot message.
+ * \param aggregate_fn Callback to calculate the aggregate cache entry.
+ *
+ * \retval New cache indexed by \a id_fn.
+ * \retval \c NULL on error
+ *
+ * \since 12.2.0
+ */
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn);
 
 /*!
  * \brief Create a topic which monitors and caches messages from another topic.
@@ -800,7 +878,7 @@
  * \param cache The cache to query.
  * \param type Type of message to retrieve.
  * \param id Identity of the snapshot to retrieve.
- * \param eid Specific entity id to retrieve.  NULL if any entity, but prefer internal.
+ * \param eid Specific entity id to retrieve.  NULL for aggregate.
  *
  * \retval Message from the cache.
  * \retval \c NULL if message is not found.

Modified: team/rmudgett/stasis_cache/main/app.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/app.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/app.c (original)
+++ team/rmudgett/stasis_cache/main/app.c Mon Feb 24 18:15:17 2014
@@ -2822,6 +2822,7 @@
 }
 
 /* BUGBUG mwi_state consumers need to be updated to deal with the cache eid separation for aggregated state. */
+/* BUGBUG mwi_state needs to be treated just like device state.  It has an aggregate for multiple eid sources. */
 int ast_publish_mwi_state_full(
 			const char *mailbox,
 			const char *context,

Modified: team/rmudgett/stasis_cache/main/stasis_cache.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_cache.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_cache.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_cache.c Mon Feb 24 18:15:17 2014
@@ -48,6 +48,7 @@
 struct stasis_cache {
 	struct ao2_container *entries;
 	snapshot_get_id id_fn;
+	cache_aggregate_calc_fn aggregate_fn;
 };
 
 /*! \internal */
@@ -130,8 +131,10 @@
 	const char *id;
 };
 
-struct cache_entry {
+struct stasis_cache_entry {
 	struct cache_entry_key key;
+	/*! Aggregate snapshot of the stasis cache. */
+	struct stasis_message *aggregate;
 	/*! Internal snapshot of the stasis event. */
 	struct stasis_message *internal;
 	/*! External snapshots of the stasis event. */
@@ -140,7 +143,7 @@
 
 static void cache_entry_dtor(void *obj)
 {
-	struct cache_entry *entry = obj;
+	struct stasis_cache_entry *entry = obj;
 	size_t idx;
 
 	ao2_cleanup(entry->key.type);
@@ -148,6 +151,8 @@
 	ast_free((char *) entry->key.id);
 	entry->key.id = NULL;
 
+	ao2_cleanup(entry->aggregate);
+	entry->aggregate = NULL;
 	ao2_cleanup(entry->internal);
 	entry->internal = NULL;
 
@@ -160,9 +165,9 @@
 	AST_VECTOR_FREE(&entry->external);
 }
 
-static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
-{
-	struct cache_entry *entry;
+static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
+{
+	struct stasis_cache_entry *entry;
 	int is_external;
 
 	ast_assert(type != NULL);
@@ -203,7 +208,7 @@
 
 static int cache_entry_hash(const void *obj, int flags)
 {
-	const struct cache_entry *object;
+	const struct stasis_cache_entry *object;
 	const struct cache_entry_key *key;
 	int hash = 0;
 
@@ -228,8 +233,8 @@
 
 static int cache_entry_cmp(void *obj, void *arg, int flags)
 {
-	const struct cache_entry *object_left = obj;
-	const struct cache_entry *object_right = arg;
+	const struct stasis_cache_entry *object_left = obj;
+	const struct stasis_cache_entry *object_right = arg;
 	const struct cache_entry_key *right_key = obj;
 	int cmp;
 
@@ -272,7 +277,7 @@
 	cache->entries = NULL;
 }
 
-struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_fn)
 {
 	struct stasis_cache *cache;
 
@@ -290,8 +295,54 @@
 	}
 
 	cache->id_fn = id_fn;
+	cache->aggregate_fn = aggregate_fn;
 
 	return cache;
+}
+
+struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
+{
+	return stasis_cache_create_full(id_fn, NULL);
+}
+
+struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
+{
+	return entry->aggregate;
+}
+
+struct stasis_message *stasis_cache_entry_get_internal(struct stasis_cache_entry *entry)
+{
+	return entry->internal;
+}
+
+struct stasis_message *stasis_cache_entry_get_external(struct stasis_cache_entry *entry, int idx)
+{
+	if (idx < AST_VECTOR_SIZE(&entry->external)) {
+		return AST_VECTOR_GET(&entry->external, idx);
+	}
+	return NULL;
+}
+
+/*!
+ * \internal
+ * \brief Find the cache entry in the cache entries container.
+ *
+ * \param entries Container of cached entries.
+ * \param type Type of message to retrieve the cache entry.
+ * \param id Identity of the snapshot to retrieve the cache entry.
+ *
+ * \note The entries container is already locked.
+ *
+ * \retval Cache-entry on success.
+ * \retval NULL Not in cache.
+ */
+static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
+{
+	struct cache_entry_key search_key;
+
+	search_key.type = type;
+	search_key.id = id;
+	return ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 }
 
 /*!
@@ -306,7 +357,7 @@
  *
  * \return Previous stasis entry snapshot.
  */
-static struct stasis_message *cache_remove(struct ao2_container *entries, struct cache_entry *cached_entry, const struct ast_eid *eid)
+static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
 {
 	struct stasis_message *old_snapshot;
 	int is_external;
@@ -347,7 +398,7 @@
  *
  * \return Previous stasis entry snapshot.
  */
-static struct stasis_message *cache_udpate(struct cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
+static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
 {
 	struct stasis_message *old_snapshot;
 	int is_external;
@@ -377,33 +428,42 @@
 	return old_snapshot;
 }
 
-static struct stasis_message *cache_put(struct stasis_cache *cache,
+struct cache_put_snapshots {
+	/*! Old cache eid snapshot. */
+	struct stasis_message *old;
+	/*! Old cache aggregate snapshot. */
+	struct stasis_message *aggregate_old;
+	/*! New cache aggregate snapshot. */
+	struct stasis_message *aggregate_new;
+};
+
+static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
 	struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
 	struct stasis_message *new_snapshot)
 {
-	struct cache_entry_key search_key;
-	struct cache_entry *cached_entry;
-	struct stasis_message *old_snapshot = NULL;
+	struct stasis_cache_entry *cached_entry;
+	struct cache_put_snapshots snapshots;
 
 	ast_assert(cache->entries != NULL);
-	ast_assert(eid != NULL);
+	ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
 	ast_assert(new_snapshot == NULL ||
 		type == stasis_message_type(new_snapshot));
 
+	memset(&snapshots, 0, sizeof(snapshots));
+
 	ao2_wrlock(cache->entries);
 
-	search_key.type = type;
-	search_key.id = id;
-	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
-
+	cached_entry = cache_find(cache->entries, type, id);
+
+	/* Update the eid snapshot. */
 	if (!new_snapshot) {
 		/* Remove snapshot from cache */
 		if (cached_entry) {
-			old_snapshot = cache_remove(cache->entries, cached_entry, eid);
+			snapshots.old = cache_remove(cache->entries, cached_entry, eid);
 		}
 	} else if (cached_entry) {
 		/* Update snapshot in cache */
-		old_snapshot = cache_udpate(cached_entry, eid, new_snapshot);
+		snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
 	} else {
 		/* Insert into the cache */
 		cached_entry = cache_entry_create(type, id, new_snapshot);
@@ -412,15 +472,22 @@
 		}
 	}
 
+	/* Update the aggregate snapshot. */
+	if (cache->aggregate_fn && cached_entry) {
+		snapshots.aggregate_new = cache->aggregate_fn(cached_entry, new_snapshot);
+		snapshots.aggregate_old = cached_entry->aggregate;
+		cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
+	}
+
 	ao2_unlock(cache->entries);
 
 	ao2_cleanup(cached_entry);
-	return old_snapshot;
+	return snapshots;
 }
 
 /*!
  * \internal
- * \brief Dump all snapshots in the cache entry into the given container.
+ * \brief Dump all entity snapshots in the cache entry into the given container.
  *
  * \param snapshots Container to put all snapshots in the cache entry.
  * \param entry Cache entry to use.
@@ -428,7 +495,7 @@
  * \retval 0 on success.
  * \retval -1 on error.
  */
-static int cache_entry_dump(struct ao2_container *snapshots, const struct cache_entry *entry)
+static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
 {
 	int idx;
 	int err = 0;
@@ -436,6 +503,8 @@
 	ast_assert(snapshots != NULL);
 	ast_assert(entry != NULL);
 
+	/* The aggregate snapshot is not a snapshot from an entity. */
+
 	if (entry->internal) {
 		err |= !ao2_link(snapshots, entry->internal);
 	}
@@ -452,8 +521,7 @@
 
 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 {
-	struct cache_entry_key search_key;
-	struct cache_entry *cached_entry;
+	struct stasis_cache_entry *cached_entry;
 	struct ao2_container *found;
 
 	ast_assert(cache != NULL);
@@ -468,9 +536,7 @@
 
 	ao2_rdlock(cache->entries);
 
-	search_key.type = type;
-	search_key.id = id;
-	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	cached_entry = cache_find(cache->entries, type, id);
 	if (cached_entry && cache_entry_dump(found, cached_entry)) {
 		ao2_cleanup(found);
 		found = NULL;
@@ -487,53 +553,44 @@
  * \brief Retrieve an item from the cache entry for a specific eid.
  *
  * \param entry Cache entry to use.
- * \param eid Specific entity id to retrieve.  NULL if any entity, but prefer internal.
+ * \param eid Specific entity id to retrieve.  NULL for aggregate.
  *
  * \note The returned snapshot has not had its reference bumped.
  *
  * \retval Snapshot from the cache.
  * \retval \c NULL if snapshot is not found.
  */
-static struct stasis_message *cache_entry_by_eid(const struct cache_entry *entry, const struct ast_eid *eid)
-{
-	struct stasis_message *snapshot = NULL;
-
-	if (eid) {
-		int is_external;
-
-		/* Get snapshot with specific eid. */
-		is_external = ast_eid_cmp(eid, &ast_eid_default);
-		if (!is_external) {
-			snapshot = entry->internal;
-		} else {
-			int idx;
-
-			for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
-				struct stasis_message *cur;
-
-				cur = AST_VECTOR_GET(&entry->external, idx);
-				if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
-					snapshot = cur;
-					break;
-				}
-			}
-		}
-	} else {
-		/* Get any snapshot, but prefer internal. */
-		if (entry->internal) {
-			snapshot = entry->internal;
-		} else if (AST_VECTOR_SIZE(&entry->external)) {
-			snapshot = AST_VECTOR_GET(&entry->external, 0);
-		}
-	}
-
-	return snapshot;
+static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
+{
+	int is_external;
+	int idx;
+
+	if (!eid) {
+		/* Get aggregate. */
+		return entry->aggregate;
+	}
+
+	/* Get snapshot with specific eid. */
+	is_external = ast_eid_cmp(eid, &ast_eid_default);
+	if (!is_external) {
+		return entry->internal;
+	}
+
+	for (idx = 0; idx < AST_VECTOR_SIZE(&entry->external); ++idx) {
+		struct stasis_message *cur;
+
+		cur = AST_VECTOR_GET(&entry->external, idx);
+		if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
+			return cur;
+		}
+	}
+
+	return NULL;
 }
 
 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 {
-	struct cache_entry_key search_key;
-	struct cache_entry *cached_entry;
+	struct stasis_cache_entry *cached_entry;
 	struct stasis_message *snapshot = NULL;
 
 	ast_assert(cache != NULL);
@@ -543,9 +600,7 @@
 
 	ao2_rdlock(cache->entries);
 
-	search_key.type = type;
-	search_key.id = id;
-	cached_entry = ao2_find(cache->entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+	cached_entry = cache_find(cache->entries, type, id);
 	if (cached_entry) {
 		snapshot = cache_entry_by_eid(cached_entry, eid);
 		ao2_bump(snapshot);
@@ -571,7 +626,7 @@
 static int cache_dump_cb(void *obj, void *arg, int flags)
 {
 	struct cache_dump_data *cache_dump = arg;
-	struct cache_entry *entry = obj;
+	struct stasis_cache_entry *entry = obj;
 
 	if (!cache_dump->type || entry->key.type == cache_dump->type) {
 		if (cache_dump->eid) {
@@ -669,9 +724,11 @@
 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
 	struct stasis_message *message)
 {
-	RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
+	struct stasis_caching_topic *caching_topic_needs_unref;
 	struct stasis_caching_topic *caching_topic = data;
-	const char *id = NULL;
+	const char *id;
+	struct stasis_message *update;
+	struct cache_put_snapshots snapshots;
 
 	ast_assert(caching_topic != NULL);
 	ast_assert(caching_topic->topic != NULL);
@@ -680,6 +737,8 @@
 
 	if (stasis_subscription_final_message(sub, message)) {
 		caching_topic_needs_unref = caching_topic;
+	} else {
+		caching_topic_needs_unref = NULL;
 	}
 
 	/* Handle cache clear event */
@@ -691,45 +750,61 @@
 		ast_assert(clear_type != NULL);
 
 		if (clear_id) {
-			struct stasis_message *old_snapshot;
-
-			old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id,
+			snapshots = cache_put(caching_topic->cache, clear_type, clear_id,
 				stasis_message_eid(clear_msg), NULL);
-			if (old_snapshot) {
-				struct stasis_message *update;
-
-				update = update_create(old_snapshot, NULL);
+			if (snapshots.old) {
+				update = update_create(snapshots.old, NULL);
 				if (update) {
 					stasis_publish(caching_topic->topic, update);
 				}
 				ao2_cleanup(update);
-				ao2_cleanup(old_snapshot);
-				return;
+			} else {
+				ast_log(LOG_ERROR,
+					"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
+					stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
 			}
 
-			ast_log(LOG_ERROR,
-				"Attempting to remove an item from the %s cache that isn't there: %s %s\n",
-				stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
-		}
+			if (snapshots.aggregate_old != snapshots.aggregate_new) {
+				update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+				if (update) {
+					stasis_publish(caching_topic->topic, update);
+				}
+				ao2_cleanup(update);
+			}
+
+			ao2_cleanup(snapshots.old);
+			ao2_cleanup(snapshots.aggregate_old);
+			ao2_cleanup(snapshots.aggregate_new);
+		}
+		ao2_cleanup(caching_topic_needs_unref);
 		return;
 	}
 
 	id = caching_topic->cache->id_fn(message);
 	if (id) {
 		/* Update the cache */
-		struct stasis_message *old_snapshot;
-		struct stasis_message *update;
-
-		old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id,
+		snapshots = cache_put(caching_topic->cache, stasis_message_type(message), id,
 			stasis_message_eid(message), message);
 
-		update = update_create(old_snapshot, message);
+		update = update_create(snapshots.old, message);
 		if (update) {
 			stasis_publish(caching_topic->topic, update);
 		}
 		ao2_cleanup(update);
-		ao2_cleanup(old_snapshot);
-	}
+
+		if (snapshots.aggregate_old != snapshots.aggregate_new) {
+			update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
+			if (update) {
+				stasis_publish(caching_topic->topic, update);
+			}
+			ao2_cleanup(update);
+		}
+
+		ao2_cleanup(snapshots.old);
+		ao2_cleanup(snapshots.aggregate_old);
+		ao2_cleanup(snapshots.aggregate_new);
+	}
+	ao2_cleanup(caching_topic_needs_unref);
 }
 
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)

Modified: team/rmudgett/stasis_cache/main/stasis_message.c
URL: http://svnview.digium.com/svn/asterisk/team/rmudgett/stasis_cache/main/stasis_message.c?view=diff&rev=408874&r1=408873&r2=408874
==============================================================================
--- team/rmudgett/stasis_cache/main/stasis_message.c (original)
+++ team/rmudgett/stasis_cache/main/stasis_message.c Mon Feb 24 18:15:17 2014
@@ -85,6 +85,8 @@
 	struct timeval timestamp;
 	/*! Type of the message */
 	struct stasis_message_type *type;
+	/*! Where this message originated.  NULL if aggregate message. */
+	const struct ast_eid *eid_ptr;
 	/*! Message content */
 	void *data;
 	/*! Where this message originated. */
@@ -102,7 +104,7 @@
 {
 	struct stasis_message *message;
 
-	if (type == NULL || data == NULL || eid == NULL) {
+	if (type == NULL || data == NULL) {
 		return NULL;
 	}
 
@@ -116,7 +118,10 @@
 	message->type = type;
 	ao2_ref(data, +1);
 	message->data = data;
-	message->eid = *eid;
+	if (eid) {
+		message->eid_ptr = &message->eid;
+		message->eid = *eid;
+	}
 
 	return message;
 }
@@ -131,7 +136,7 @@
 	if (msg == NULL) {
 		return NULL;
 	}
-	return &msg->eid;
+	return msg->eid_ptr;
 }
 
 struct stasis_message_type *stasis_message_type(const struct stasis_message *msg)




More information about the asterisk-commits mailing list