[asterisk-commits] dlee: branch dlee/stasis-demo r386047 - /team/dlee/stasis-demo/res/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Apr 18 23:01:14 CDT 2013


Author: dlee
Date: Thu Apr 18 23:01:12 2013
New Revision: 386047

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=386047
Log:
Cleaned up the example.

Modified:
    team/dlee/stasis-demo/res/res_chan_stats.c

Modified: team/dlee/stasis-demo/res/res_chan_stats.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-demo/res/res_chan_stats.c?view=diff&rev=386047&r1=386046&r2=386047
==============================================================================
--- team/dlee/stasis-demo/res/res_chan_stats.c (original)
+++ team/dlee/stasis-demo/res/res_chan_stats.c Thu Apr 18 23:01:12 2013
@@ -38,15 +38,37 @@
 
 #include "asterisk/module.h"
 #include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/statsd.h"
 #include "asterisk/time.h"
 
+/*! Regular Stasis subscription */
 static struct stasis_subscription *sub;
+/*! Stasis message router */
+static struct stasis_message_router *router;
 
+/*!
+ * \brief Subscription callback for all channel messages.
+ * \param data Data pointer given when creating the subscription.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ *              topic you subscribed to, since messages may be forwarded between
+ *              topics.
+ * \param message The message itself.
+ */
 static void statsmaker(void *data, struct stasis_subscription *sub,
 	struct stasis_topic *topic, struct stasis_message *message)
 {
 	RAII_VAR(struct ast_str *, metric, NULL, ast_free);
+
+	if (stasis_subscription_final_message(sub, message)) {
+		/* Normally, data points to an object that must be cleaned up.
+		 * The final message is an unsubscribe notification that's
+		 * guaranteed to be the last message this subscription receives.
+		 * This would be a safe place to kick off any needed cleanup.
+		 */
+		return;
+	}
 
 	/* For no good reason, count message types */
 	metric = ast_str_create(80);
@@ -55,33 +77,93 @@
 			stasis_message_type_name(stasis_message_type(message)));
 		ast_statsd_log(ast_str_buffer(metric), AST_STATSD_METER, 1);
 	}
+}
 
-	if (stasis_cache_update_type() == stasis_message_type(message)) {
-		struct stasis_cache_update *update = stasis_message_data(message);
+/*!
+ * \brief Router callback for \ref stasis_cache_update messages.
+ * \param data Data pointer given when added to router.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ *              topic you subscribed to, since messages may be forwarded between
+ *              topics.
+ * \param message The message itself.
+ */
+static void updates(void *data, struct stasis_subscription *sub,
+	struct stasis_topic *topic, struct stasis_message *message)
+{
+	/* Since this came from a message router, we know the type of the
+	 * message. We can cast the data without checking its type.
+	 */
+	struct stasis_cache_update *update = stasis_message_data(message);
 
-		if (ast_channel_snapshot_type() != update->type) {
-			return;
-		}
-		if (!update->old_snapshot && update->new_snapshot) {
-			ast_statsd_log("channels.count", AST_STATSD_COUNTER, 1);
-		} else if (update->old_snapshot && !update->new_snapshot) {
-			struct ast_channel_snapshot *last;
-			int64_t age;
+	/* We're only interested in channel snapshots, so check the type
+	 * of the underlying message.
+	 */
+	if (ast_channel_snapshot_type() != update->type) {
+		return;
+	}
 
-			last = stasis_message_data(update->old_snapshot);
-			age = ast_tvdiff_ms(*stasis_message_timestamp(message),
-				last->creationtime);
-			ast_statsd_log("channels.count", AST_STATSD_COUNTER, -1);
-			ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
-		}
+	/* There are three types of cache updates.
+	 * !old && new -> Initial cache entry
+	 * old && new -> Updated cache entry
+	 * old && !new -> Cache entry removed.
+	 */
+
+	if (!update->old_snapshot && update->new_snapshot) {
+		/* Initial cache entry; count a channel creation */
+		ast_statsd_log("channels.count", AST_STATSD_COUNTER, 1);
+	} else if (update->old_snapshot && !update->new_snapshot) {
+		/* Cache entry removed. Compute the age of the channel and post
+		 * that, as well as decrementing the channel count.
+		 */
+		struct ast_channel_snapshot *last;
+		int64_t age;
+
+		last = stasis_message_data(update->old_snapshot);
+		age = ast_tvdiff_ms(*stasis_message_timestamp(message),
+			last->creationtime);
+		ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
+
+		/* And decrement the channel count */
+		ast_statsd_log("channels.count", AST_STATSD_COUNTER, -1);
+	}
+}
+
+/*!
+ * \brief Router callback for any message that doesn't otherwise have a route.
+ * \param data Data pointer given when added to router.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ *              topic you subscribed to, since messages may be forwarded between
+ *              topics.
+ * \param message The message itself.
+ */
+static void default_route(void *data, struct stasis_subscription *sub,
+	struct stasis_topic *topic, struct stasis_message *message)
+{
+	if (stasis_subscription_final_message(sub, message)) {
+		/* Much like with the regular subscription, you may need to
+		 * perform some cleanup when done with a message router. You
+		 * can look for the final message in the default route.
+		 */
+		return;
 	}
 }
 
 static int load_module(void)
 {
-	sub = stasis_subscribe(
-		stasis_caching_get_topic(ast_channel_topic_all_cached()),
-		statsmaker, NULL);
+	/* You can create a message router to route messages by type */
+	router = stasis_message_router_create(
+		stasis_caching_get_topic(ast_channel_topic_all_cached()));
+	if (!router) {
+		return AST_MODULE_LOAD_FAILURE;
+	}
+	stasis_message_router_add(router, stasis_cache_update_type(),
+		updates, NULL);
+	stasis_message_router_set_default(router, default_route, NULL);
+
+	/* Or a subscription to receive all of the messages from a topic */
+	sub = stasis_subscribe(ast_channel_topic_all(), statsmaker, NULL);
 	if (!sub) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
@@ -92,6 +174,8 @@
 {
 	stasis_unsubscribe(sub);
 	sub = NULL;
+	stasis_message_router_unsubscribe(router);
+	router = NULL;
 	return 0;
 }
 




More information about the asterisk-commits mailing list