[asterisk-commits] dlee: branch dlee/stasis-demo r386047 - /team/dlee/stasis-demo/res/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Thu Apr 18 23:01:14 CDT 2013
Author: dlee
Date: Thu Apr 18 23:01:12 2013
New Revision: 386047
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=386047
Log:
Cleaned up the example.
Modified:
team/dlee/stasis-demo/res/res_chan_stats.c
Modified: team/dlee/stasis-demo/res/res_chan_stats.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/stasis-demo/res/res_chan_stats.c?view=diff&rev=386047&r1=386046&r2=386047
==============================================================================
--- team/dlee/stasis-demo/res/res_chan_stats.c (original)
+++ team/dlee/stasis-demo/res/res_chan_stats.c Thu Apr 18 23:01:12 2013
@@ -38,15 +38,37 @@
#include "asterisk/module.h"
#include "asterisk/stasis_channels.h"
+#include "asterisk/stasis_message_router.h"
#include "asterisk/statsd.h"
#include "asterisk/time.h"
+/*! Regular Stasis subscription */
static struct stasis_subscription *sub;
+/*! Stasis message router */
+static struct stasis_message_router *router;
+/*!
+ * \brief Subscription callback for all channel messages.
+ * \param data Data pointer given when creating the subscription.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ * topic you subscribed to, since messages may be forwarded between
+ * topics.
+ * \param message The message itself.
+ */
static void statsmaker(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
{
RAII_VAR(struct ast_str *, metric, NULL, ast_free);
+
+ if (stasis_subscription_final_message(sub, message)) {
+ /* Normally, data points to an object that must be cleaned up.
+ * The final message is an unsubscribe notification that's
+ * guaranteed to be the last message this subscription receives.
+ * This would be a safe place to kick off any needed cleanup.
+ */
+ return;
+ }
/* For no good reason, count message types */
metric = ast_str_create(80);
@@ -55,33 +77,93 @@
stasis_message_type_name(stasis_message_type(message)));
ast_statsd_log(ast_str_buffer(metric), AST_STATSD_METER, 1);
}
+}
- if (stasis_cache_update_type() == stasis_message_type(message)) {
- struct stasis_cache_update *update = stasis_message_data(message);
+/*!
+ * \brief Router callback for \ref stasis_cache_update messages.
+ * \param data Data pointer given when added to router.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ * topic you subscribed to, since messages may be forwarded between
+ * topics.
+ * \param message The message itself.
+ */
+static void updates(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ /* Since this came from a message router, we know the type of the
+ * message. We can cast the data without checking its type.
+ */
+ struct stasis_cache_update *update = stasis_message_data(message);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
- if (!update->old_snapshot && update->new_snapshot) {
- ast_statsd_log("channels.count", AST_STATSD_COUNTER, 1);
- } else if (update->old_snapshot && !update->new_snapshot) {
- struct ast_channel_snapshot *last;
- int64_t age;
+ /* We're only interested in channel snapshots, so check the type
+ * of the underlying message.
+ */
+ if (ast_channel_snapshot_type() != update->type) {
+ return;
+ }
- last = stasis_message_data(update->old_snapshot);
- age = ast_tvdiff_ms(*stasis_message_timestamp(message),
- last->creationtime);
- ast_statsd_log("channels.count", AST_STATSD_COUNTER, -1);
- ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
- }
+ /* There are three types of cache updates.
+ * !old && new -> Initial cache entry
+ * old && new -> Updated cache entry
+ * old && !new -> Cache entry removed.
+ */
+
+ if (!update->old_snapshot && update->new_snapshot) {
+ /* Initial cache entry; count a channel creation */
+ ast_statsd_log("channels.count", AST_STATSD_COUNTER, 1);
+ } else if (update->old_snapshot && !update->new_snapshot) {
+ /* Cache entry removed. Compute the age of the channel and post
+ * that, as well as decrementing the channel count.
+ */
+ struct ast_channel_snapshot *last;
+ int64_t age;
+
+ last = stasis_message_data(update->old_snapshot);
+ age = ast_tvdiff_ms(*stasis_message_timestamp(message),
+ last->creationtime);
+ ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
+
+ /* And decrement the channel count */
+ ast_statsd_log("channels.count", AST_STATSD_COUNTER, -1);
+ }
+}
+
+/*!
+ * \brief Router callback for any message that doesn't otherwise have a route.
+ * \param data Data pointer given when added to router.
+ * \param sub This subscription.
+ * \param topic The topic the message was posted to. This is not necessarily the
+ * topic you subscribed to, since messages may be forwarded between
+ * topics.
+ * \param message The message itself.
+ */
+static void default_route(void *data, struct stasis_subscription *sub,
+ struct stasis_topic *topic, struct stasis_message *message)
+{
+ if (stasis_subscription_final_message(sub, message)) {
+ /* Much like with the regular subscription, you may need to
+ * perform some cleanup when done with a message router. You
+ * can look for the final message in the default route.
+ */
+ return;
}
}
static int load_module(void)
{
- sub = stasis_subscribe(
- stasis_caching_get_topic(ast_channel_topic_all_cached()),
- statsmaker, NULL);
+ /* You can create a message router to route messages by type */
+ router = stasis_message_router_create(
+ stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ stasis_message_router_add(router, stasis_cache_update_type(),
+ updates, NULL);
+ stasis_message_router_set_default(router, default_route, NULL);
+
+ /* Or a subscription to receive all of the messages from a topic */
+ sub = stasis_subscribe(ast_channel_topic_all(), statsmaker, NULL);
if (!sub) {
return AST_MODULE_LOAD_FAILURE;
}
@@ -92,6 +174,8 @@
{
stasis_unsubscribe(sub);
sub = NULL;
+ stasis_message_router_unsubscribe(router);
+ router = NULL;
return 0;
}
More information about the asterisk-commits
mailing list