[Asterisk-code-review] stasis: Add statistics gathering in developer mode. (asterisk[master])

Friendly Automation asteriskteam at digium.com
Wed Dec 12 13:08:26 CST 2018


Friendly Automation has submitted this change and it was merged. ( https://gerrit.asterisk.org/10766 )

Change subject: stasis: Add statistics gathering in developer mode.
......................................................................

stasis: Add statistics gathering in developer mode.

This change adds statistics gathering to Stasis topics,
subscriptions, and message types. These can be viewed using
CLI commands and provide insight into how Stasis is used
and how long certain operations take to execute.

These are only available when Asterisk is compiled in
developer mode and do not have any impact under normal
operation.

ASTERISK-28117

Change-Id: I94411b53767f89ee01714daaecf0c2f1666e863f
---
M include/asterisk/stasis.h
M include/asterisk/stasis_internal.h
M include/asterisk/stasis_message_router.h
M main/asterisk.c
M main/asterisk.exports.in
M main/stasis.c
M main/stasis_cache.c
M main/stasis_message_router.c
8 files changed, 848 insertions(+), 14 deletions(-)

Approvals:
  George Joseph: Looks good to me, approved
  Friendly Automation: Approved for Submit



diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 6d423d9..2e274a6 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -604,8 +604,14 @@
  * has been subscribed. This occurs immediately before accepted message
  * types can be set and the callback must expect to receive it.
  */
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic,
+	stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
+#define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+#else
 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
 	stasis_subscription_cb callback, void *data);
+#endif
 
 /*!
  * \brief Create a subscription whose callbacks occur on a thread pool
@@ -633,8 +639,14 @@
  * has been subscribed. This occurs immediately before accepted message
  * types can be set and the callback must expect to receive it.
  */
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe_pool(struct stasis_topic *topic,
+	stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
+#define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+#else
 struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
 	stasis_subscription_cb callback, void *data);
+#endif
 
 /*!
  * \brief Indicate to a subscription that we are interested in a message type.
diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h
index bc6122c..c9df032 100644
--- a/include/asterisk/stasis_internal.h
+++ b/include/asterisk/stasis_internal.h
@@ -60,11 +60,23 @@
  * \return \c NULL on error.
  * \since 12
  */
+#ifdef AST_DEVMODE
+struct stasis_subscription *internal_stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	int needs_mailbox,
+	int use_thread_pool,
+	const char *file,
+	int lineno,
+	const char *func);
+#else
 struct stasis_subscription *internal_stasis_subscribe(
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
 	void *data,
 	int needs_mailbox,
 	int use_thread_pool);
+#endif
 
 #endif /* STASIS_INTERNAL_H_ */
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 9897d62..93a2140 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -55,8 +55,14 @@
  *
  * \since 12
  */
+#ifdef AST_DEVMODE
+struct stasis_message_router *__stasis_message_router_create(
+	struct stasis_topic *topic, const char *file, int lineno, const char *func);
+#define stasis_message_router_create(topic) __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+#else
 struct stasis_message_router *stasis_message_router_create(
 	struct stasis_topic *topic);
+#endif
 
 /*!
  * \brief Create a new message router object.
@@ -71,8 +77,14 @@
  *
  * \since 12.8.0
  */
+#ifdef AST_DEVMODE
+struct stasis_message_router *__stasis_message_router_create_pool(
+	struct stasis_topic *topic, const char *file, int lineno, const char *func);
+#define stasis_message_router_create_pool(topic) __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+#else
 struct stasis_message_router *stasis_message_router_create_pool(
 	struct stasis_topic *topic);
+#endif
 
 /*!
  * \brief Unsubscribe the router from the upstream topic.
diff --git a/main/asterisk.c b/main/asterisk.c
index 7eb49df..36e956f 100644
--- a/main/asterisk.c
+++ b/main/asterisk.c
@@ -4070,6 +4070,8 @@
 	check_init(ast_tps_init(), "Task Processor Core");
 	check_init(ast_fd_init(), "File Descriptor Debugging");
 	check_init(ast_pbx_init(), "ast_pbx_init");
+	check_init(aco_init(), "Configuration Option Framework");
+	check_init(stasis_init(), "Stasis");
 #ifdef TEST_FRAMEWORK
 	check_init(ast_test_init(), "Test Framework");
 #endif
@@ -4082,9 +4084,7 @@
 	check_init(ast_format_init(), "Formats");
 	check_init(ast_format_cache_init(), "Format Cache");
 	check_init(ast_codec_builtin_init(), "Built-in Codecs");
-	check_init(aco_init(), "Configuration Option Framework");
 	check_init(ast_bucket_init(), "Bucket API");
-	check_init(stasis_init(), "Stasis");
 	check_init(ast_stasis_system_init(), "Stasis system-level information");
 	check_init(ast_endpoint_stasis_init(), "Stasis Endpoint");
 
diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in
index f3549e6..0232855 100644
--- a/main/asterisk.exports.in
+++ b/main/asterisk.exports.in
@@ -27,6 +27,7 @@
 		LINKER_SYMBOL_PREFIXstrsep;
 		LINKER_SYMBOL_PREFIXsetenv;
 		LINKER_SYMBOL_PREFIXstasis_*;
+		LINKER_SYMBOL_PREFIX__stasis_*;
 		LINKER_SYMBOL_PREFIXunsetenv;
 		LINKER_SYMBOL_PREFIXstrcasestr;
 		LINKER_SYMBOL_PREFIXstrnlen;
diff --git a/main/stasis.c b/main/stasis.c
index 69ec1a5..3216e21 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -41,6 +41,9 @@
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/config_options.h"
+#ifdef AST_DEVMODE
+#include "asterisk/cli.h"
+#endif
 
 /*** DOCUMENTATION
 	<managerEvent language="en_US" name="UserEvent">
@@ -304,14 +307,67 @@
 
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
+#ifdef AST_DEVMODE
+
+/*! The number of buckets to use for topic statistics */
+#define TOPIC_STATISTICS_BUCKETS 57
+
+/*! The number of buckets to use for subscription statistics */
+#define SUBSCRIPTION_STATISTICS_BUCKETS 57
+
+/*! Container which stores statistics for topics */
+static struct ao2_container *topic_statistics;
+
+/*! Container which stores statistics for subscriptions */
+static struct ao2_container *subscription_statistics;
+
+/*! \internal */
+struct stasis_message_type_statistics {
+	/*! \brief The number of messages of this published */
+	int published;
+	/*! \brief The number of messages of this that did not reach a subscriber */
+	int unused;
+	/*! \brief The stasis message type */
+	struct stasis_message_type *message_type;
+};
+
+/*! Lock to protect the message types vector */
+AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock);
+
+/*! Vector containing message type information */
+static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics;
+
+/*! \internal */
+struct stasis_topic_statistics {
+	/*! \brief The number of messages that were not dispatched to any subscriber */
+	int messages_not_dispatched;
+	/*! \brief The number of messages that were dispatched to at least 1 subscriber */
+	int messages_dispatched;
+	/*! \brief Highest time spent dispatching messages to subscribers */
+	int64_t highest_time_dispatched;
+	/*! \brief Lowest time spent dispatching messages to subscribers */
+	int64_t lowest_time_dispatched;
+	/*! \brief The number of subscribers to this topic */
+	int subscriber_count;
+	/*! \brief Name of the topic */
+	char name[0];
+};
+#endif
+
 /*! \internal */
 struct stasis_topic {
-	char *name;
 	/*! Variable length array of the subscribers */
 	AST_VECTOR(, struct stasis_subscription *) subscribers;
 
 	/*! Topics forwarding into this topic */
 	AST_VECTOR(, struct stasis_topic *) upstream_topics;
+
+#ifdef AST_DEVMODE
+	struct stasis_topic_statistics *statistics;
+#endif
+
+	/*! Name of the topic */
+	char name[0];
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
@@ -337,28 +393,54 @@
 	 * unsubscribed before we get here. */
 	ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
 
-	ast_free(topic->name);
-	topic->name = NULL;
-
 	AST_VECTOR_FREE(&topic->subscribers);
 	AST_VECTOR_FREE(&topic->upstream_topics);
+
+#ifdef AST_DEVMODE
+	if (topic->statistics) {
+		ao2_unlink(topic_statistics, topic->statistics);
+		ao2_ref(topic->statistics, -1);
+	}
+#endif
 }
 
+#ifdef AST_DEVMODE
+static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
+{
+	struct stasis_topic_statistics *statistics;
+
+	statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, NULL);
+	if (!statistics) {
+		return NULL;
+	}
+
+	strcpy(statistics->name, name); /* SAFE */
+	ao2_link(topic_statistics, statistics);
+
+	return statistics;
+}
+#endif
+
 struct stasis_topic *stasis_topic_create(const char *name)
 {
 	struct stasis_topic *topic;
 	int res = 0;
 
-	topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
+	topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
 	if (!topic) {
 		return NULL;
 	}
 
-	topic->name = ast_strdup(name);
+	strcpy(topic->name, name); /* SAFE */
 	res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
 	res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
+#ifdef AST_DEVMODE
+	topic->statistics = stasis_topic_statistics_create(name);
+	if (!topic->name || !topic->statistics || res) {
+#else
 	if (!topic->name || res) {
-		ao2_cleanup(topic);
+#endif
+		ao2_ref(topic, -1);
 		return NULL;
 	}
 
@@ -375,6 +457,35 @@
 	return AST_VECTOR_SIZE(&topic->subscribers);
 }
 
+#ifdef AST_DEVMODE
+struct stasis_subscription_statistics {
+	/*! \brief The filename where the subscription originates */
+	const char *file;
+	/*! \brief The line number where the subscription originates */
+	int lineno;
+	/*! \brief The function where the subscription originates */
+	const char *func;
+	/*! \brief The number of messages that were filtered out */
+	int messages_dropped;
+	/*! \brief The number of messages that passed filtering */
+	int messages_passed;
+	/*! \brief Highest time spent invoking a message */
+	int64_t highest_time_invoked;
+	/*! \brief The message type that currently took the longest to process */
+	struct stasis_message_type *highest_time_message_type;
+	/*! \brief Lowest time spent invoking a message */
+	int64_t lowest_time_invoked;
+	/*! \brief Using a mailbox to queue messages */
+	int uses_mailbox;
+	/*! \brief Using stasis threadpool for handling messages */
+	int uses_threadpool;
+	/*! \brief Name of the topic we subscribed to */
+	char *topic;
+	/*! \brief Unique ID of the subscription */
+	char uniqueid[0];
+};
+#endif
+
 /*! \internal */
 struct stasis_subscription {
 	/*! Unique ID for this subscription */
@@ -403,6 +514,11 @@
 	enum stasis_subscription_message_formatters accepted_formatters;
 	/*! The message filter currently in use */
 	enum stasis_subscription_message_filter filter;
+
+#ifdef AST_DEVMODE
+	/*! Statistics information */
+	struct stasis_subscription_statistics *statistics;
+#endif
 };
 
 static void subscription_dtor(void *obj)
@@ -423,6 +539,13 @@
 	ast_cond_destroy(&sub->join_cond);
 
 	AST_VECTOR_FREE(&sub->accepted_message_types);
+
+#ifdef AST_DEVMODE
+	if (sub->statistics) {
+		ao2_unlink(subscription_statistics, sub->statistics);
+		ao2_ref(sub->statistics, -1);
+	}
+#endif
 }
 
 /*!
@@ -436,6 +559,12 @@
 {
 	unsigned int final = stasis_subscription_final_message(sub, message);
 	int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+#ifdef AST_DEVMODE
+	struct timeval start;
+	int elapsed;
+
+	start = ast_tvnow();
+#endif
 
 	/* Notify that the final message has been received */
 	if (final) {
@@ -462,6 +591,19 @@
 		ast_cond_signal(&sub->join_cond);
 		ao2_unlock(sub);
 	}
+
+#ifdef AST_DEVMODE
+	elapsed = ast_tvdiff_ms(ast_tvnow(), start);
+	if (elapsed > sub->statistics->highest_time_invoked) {
+		sub->statistics->highest_time_invoked = elapsed;
+		ao2_lock(sub->statistics);
+		sub->statistics->highest_time_message_type = stasis_message_type(message);
+		ao2_unlock(sub->statistics);
+	}
+	if (elapsed < sub->statistics->lowest_time_invoked) {
+		sub->statistics->lowest_time_invoked = elapsed;
+	}
+#endif
 }
 
 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
@@ -471,12 +613,51 @@
 {
 }
 
+#ifdef AST_DEVMODE
+static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
+	const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+	const char *func)
+{
+	struct stasis_subscription_statistics *statistics;
+	size_t uniqueid_len = strlen(uniqueid) + 1;
+
+	statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
+	if (!statistics) {
+		return NULL;
+	}
+
+	statistics->file = file;
+	statistics->lineno = lineno;
+	statistics->func = func;
+	statistics->uses_mailbox = needs_mailbox;
+	statistics->uses_threadpool = use_thread_pool;
+	strcpy(statistics->uniqueid, uniqueid); /* SAFE */
+	statistics->topic = statistics->uniqueid + uniqueid_len;
+	strcpy(statistics->topic, topic); /* SAFE */
+	ao2_link(subscription_statistics, statistics);
+
+	return statistics;
+}
+#endif
+
+#ifdef AST_DEVMODE
+struct stasis_subscription *internal_stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	int needs_mailbox,
+	int use_thread_pool,
+	const char *file,
+	int lineno,
+	const char *func)
+#else
 struct stasis_subscription *internal_stasis_subscribe(
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
 	void *data,
 	int needs_mailbox,
 	int use_thread_pool)
+#endif
 {
 	struct stasis_subscription *sub;
 
@@ -491,6 +672,15 @@
 	}
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
+#ifdef AST_DEVMODE
+	sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
+		use_thread_pool, file, lineno, func);
+	if (!sub->statistics) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+#endif
+
 	if (needs_mailbox) {
 		char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
@@ -538,6 +728,18 @@
 	return sub;
 }
 
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	const char *file,
+	int lineno,
+	const char *func)
+{
+	return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func);
+}
+#else
 struct stasis_subscription *stasis_subscribe(
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
@@ -545,7 +747,20 @@
 {
 	return internal_stasis_subscribe(topic, callback, data, 1, 0);
 }
+#endif
 
+#ifdef AST_DEVMODE
+struct stasis_subscription *__stasis_subscribe_pool(
+	struct stasis_topic *topic,
+	stasis_subscription_cb callback,
+	void *data,
+	const char *file,
+	int lineno,
+	const char *func)
+{
+	return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func);
+}
+#else
 struct stasis_subscription *stasis_subscribe_pool(
 	struct stasis_topic *topic,
 	stasis_subscription_cb callback,
@@ -553,6 +768,7 @@
 {
 	return internal_stasis_subscribe(topic, callback, data, 1, 1);
 }
+#endif
 
 static int sub_cleanup(void *data)
 {
@@ -808,6 +1024,11 @@
 		topic_add_subscription(
 			AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
 	}
+
+#ifdef AST_DEVMODE
+	topic->statistics->subscriber_count += 1;
+#endif
+
 	ao2_unlock(topic);
 
 	return 0;
@@ -825,6 +1046,13 @@
 	}
 	res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
 		AST_VECTOR_ELEM_CLEANUP_NOOP);
+
+#ifdef AST_DEVMODE
+	if (!res) {
+		topic->statistics->subscriber_count -= 1;
+	}
+#endif
+
 	ao2_unlock(topic);
 
 	return res;
@@ -885,8 +1113,10 @@
  * \param message The message to send
  * \param synchronous If non-zero, synchronize on the subscriber receiving
  * the message
+ * \retval 0 if message was not dispatched
+ * \retval 1 if message was dispatched
  */
-static void dispatch_message(struct stasis_subscription *sub,
+static unsigned int dispatch_message(struct stasis_subscription *sub,
 	struct stasis_message *message,
 	int synchronous)
 {
@@ -938,14 +1168,22 @@
 			break;
 		}
 
-		return;
+#ifdef AST_DEVMODE
+		ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1);
+#endif
+
+		return 0;
 
 	} while (0);
 
+#ifdef AST_DEVMODE
+	ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1);
+#endif
+
 	if (!sub->mailbox) {
 		/* Dispatch directly */
 		subscription_invoke(sub, message);
-		return;
+		return 1;
 	}
 
 	/* Bump the message for the taskprocessor push. This will get de-ref'd
@@ -957,6 +1195,7 @@
 			/* Push failed; ugh. */
 			ast_log(LOG_ERROR, "Dropping async dispatch\n");
 			ao2_cleanup(message);
+			return 0;
 		}
 	} else {
 		struct sync_task_data std;
@@ -972,7 +1211,7 @@
 			ao2_cleanup(message);
 			ast_mutex_destroy(&std.lock);
 			ast_cond_destroy(&std.cond);
-			return;
+			return 0;
 		}
 
 		ast_mutex_lock(&std.lock);
@@ -984,6 +1223,8 @@
 		ast_mutex_destroy(&std.lock);
 		ast_cond_destroy(&std.cond);
 	}
+
+	return 1;
 }
 
 /*!
@@ -997,12 +1238,41 @@
 	struct stasis_message *message, struct stasis_subscription *sync_sub)
 {
 	size_t i;
+	unsigned int dispatched = 0;
+#ifdef AST_DEVMODE
+	int message_type_id = stasis_message_type_id(stasis_message_type(message));
+	struct stasis_message_type_statistics *statistics;
+	struct timeval start;
+	int elapsed;
+#endif
 
 	ast_assert(topic != NULL);
 	ast_assert(message != NULL);
 
+#ifdef AST_DEVMODE
+	ast_mutex_lock(&message_type_statistics_lock);
+	if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) {
+		struct stasis_message_type_statistics new_statistics = {
+			.published = 0,
+		};
+		if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) {
+			ast_mutex_unlock(&message_type_statistics_lock);
+			return;
+		}
+	}
+	statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id);
+	statistics->message_type = stasis_message_type(message);
+	ast_mutex_unlock(&message_type_statistics_lock);
+
+	ast_atomic_fetchadd_int(&statistics->published, +1);
+#endif
+
 	/* If there are no subscribers don't bother */
 	if (!stasis_topic_subscribers(topic)) {
+#ifdef AST_DEVMODE
+		ast_atomic_fetchadd_int(&statistics->unused, +1);
+		ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
+#endif
 		return;
 	}
 
@@ -1011,15 +1281,35 @@
 	 * Make sure we hold onto a reference while dispatching.
 	 */
 	ao2_ref(topic, +1);
+#ifdef AST_DEVMODE
+	start = ast_tvnow();
+#endif
 	ao2_lock(topic);
 	for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
 		struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i);
 
 		ast_assert(sub != NULL);
 
-		dispatch_message(sub, message, (sub == sync_sub));
+		dispatched += dispatch_message(sub, message, (sub == sync_sub));
 	}
 	ao2_unlock(topic);
+
+#ifdef AST_DEVMODE
+	elapsed = ast_tvdiff_ms(ast_tvnow(), start);
+	if (elapsed > topic->statistics->highest_time_dispatched) {
+		topic->statistics->highest_time_dispatched = elapsed;
+	}
+	if (elapsed < topic->statistics->lowest_time_dispatched) {
+		topic->statistics->lowest_time_dispatched = elapsed;
+	}
+	if (dispatched) {
+		ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1);
+	} else {
+		ast_atomic_fetchadd_int(&statistics->unused, +1);
+		ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1);
+	}
+#endif
+
 	ao2_ref(topic, -1);
 }
 
@@ -1805,9 +2095,458 @@
 
 /*! @} */
 
+#ifdef AST_DEVMODE
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show subscriptions'
+ */
+static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct ao2_iterator iter;
+	struct stasis_subscription_statistics *statistics;
+	int count = 0;
+	int dropped = 0;
+	int passed = 0;
+#define FMT_HEADERS		"%-64s %10s %10s %16s %16s\n"
+#define FMT_FIELDS		"%-64s %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2		"%-64s %10d %10d\n"
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis statistics show subscriptions";
+		e->usage =
+			"Usage: stasis statistics show subscriptions\n"
+			"	Shows a list of subscriptions and their general statistics\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;
+	}
+
+	if (a->argc != e->args) {
+		return CLI_SHOWUSAGE;
+	}
+
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
+
+	iter = ao2_iterator_init(subscription_statistics, 0);
+	while ((statistics = ao2_iterator_next(&iter))) {
+		ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
+			statistics->lowest_time_invoked, statistics->highest_time_invoked);
+		dropped += statistics->messages_dropped;
+		passed += statistics->messages_passed;
+		ao2_ref(statistics, -1);
+		++count;
+	}
+	ao2_iterator_destroy(&iter);
+
+	ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
+	ast_cli(a->fd, "\n%d subscriptions\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+#undef FMT_FIELDS2
+
+	return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI tab completion for subscription statistics names
+ */
+static char *subscription_statistics_complete_name(const char *word, int state)
+{
+	struct stasis_subscription_statistics *statistics;
+	struct ao2_iterator it_statistics;
+	int wordlen = strlen(word);
+	int which = 0;
+	char *result = NULL;
+
+	it_statistics = ao2_iterator_init(subscription_statistics, 0);
+	while ((statistics = ao2_iterator_next(&it_statistics))) {
+		if (!strncasecmp(word, statistics->uniqueid, wordlen)
+			&& ++which > state) {
+			result = ast_strdup(statistics->uniqueid);
+		}
+		ao2_ref(statistics, -1);
+		if (result) {
+			break;
+		}
+	}
+	ao2_iterator_destroy(&it_statistics);
+	return result;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show subscription'
+ */
+static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct stasis_subscription_statistics *statistics;
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis statistics show subscription";
+		e->usage =
+		    "Usage: stasis statistics show subscription <uniqueid>\n"
+		    "       Show stasis subscription statistics.\n";
+		return NULL;
+	case CLI_GENERATE:
+		if (a->pos == 4) {
+			return subscription_statistics_complete_name(a->word, a->n);
+		} else {
+			return NULL;
+		}
+	}
+
+	if (a->argc != 5) {
+		return CLI_SHOWUSAGE;
+	}
+
+	statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY);
+	if (!statistics) {
+		ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]);
+		return CLI_FAILURE;
+	}
+
+	ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
+	ast_cli(a->fd, "Topic: %s\n", statistics->topic);
+	ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
+	ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
+	ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
+	ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
+	ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
+	ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
+	ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
+	ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
+	ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
+
+	ao2_lock(statistics);
+	if (statistics->highest_time_message_type) {
+		ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type));
+	}
+	ao2_unlock(statistics);
+
+	ao2_ref(statistics, -1);
+
+	return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show topics'
+ */
+static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct ao2_iterator iter;
+	struct stasis_topic_statistics *statistics;
+	int count = 0;
+	int not_dispatched = 0;
+	int dispatched = 0;
+#define FMT_HEADERS		"%-64s %10s %10s %16s %16s\n"
+#define FMT_FIELDS		"%-64s %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2		"%-64s %10d %10d\n"
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis statistics show topics";
+		e->usage =
+			"Usage: stasis statistics show topics\n"
+			"	Shows a list of topics and their general statistics\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;
+	}
+
+	if (a->argc != e->args) {
+		return CLI_SHOWUSAGE;
+	}
+
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+
+	iter = ao2_iterator_init(topic_statistics, 0);
+	while ((statistics = ao2_iterator_next(&iter))) {
+		ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
+			statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
+		not_dispatched += statistics->messages_not_dispatched;
+		dispatched += statistics->messages_dispatched;
+		ao2_ref(statistics, -1);
+		++count;
+	}
+	ao2_iterator_destroy(&iter);
+
+	ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
+	ast_cli(a->fd, "\n%d topics\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+#undef FMT_FIELDS2
+
+	return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI tab completion for topic statistics names
+ */
+static char *topic_statistics_complete_name(const char *word, int state)
+{
+	struct stasis_topic_statistics *statistics;
+	struct ao2_iterator it_statistics;
+	int wordlen = strlen(word);
+	int which = 0;
+	char *result = NULL;
+
+	it_statistics = ao2_iterator_init(topic_statistics, 0);
+	while ((statistics = ao2_iterator_next(&it_statistics))) {
+		if (!strncasecmp(word, statistics->name, wordlen)
+			&& ++which > state) {
+			result = ast_strdup(statistics->name);
+		}
+		ao2_ref(statistics, -1);
+		if (result) {
+			break;
+		}
+	}
+	ao2_iterator_destroy(&it_statistics);
+	return result;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show topic'
+ */
+static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct stasis_topic_statistics *statistics;
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis statistics show topic";
+		e->usage =
+		    "Usage: stasis statistics show topic <name>\n"
+		    "       Show stasis topic statistics.\n";
+		return NULL;
+	case CLI_GENERATE:
+		if (a->pos == 4) {
+			return topic_statistics_complete_name(a->word, a->n);
+		} else {
+			return NULL;
+		}
+	}
+
+	if (a->argc != 5) {
+		return CLI_SHOWUSAGE;
+	}
+
+	statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY);
+	if (!statistics) {
+		ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]);
+		return CLI_FAILURE;
+	}
+
+	ast_cli(a->fd, "Topic: %s\n", statistics->name);
+	ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
+	ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
+	ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
+	ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched);
+	ast_cli(a->fd, "Number of subscribers: %d\n", statistics->subscriber_count);
+
+	ao2_ref(statistics, -1);
+
+	return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis statistics show messages'
+ */
+static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	int i;
+	int count = 0;
+	int published = 0;
+	int unused = 0;
+#define FMT_HEADERS		"%-64s %10s %10s\n"
+#define FMT_FIELDS		"%-64s %10d %10d\n"
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis statistics show messages";
+		e->usage =
+			"Usage: stasis statistics show messages\n"
+			"	Shows a list of message types and their general statistics\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;
+	}
+
+	if (a->argc != e->args) {
+		return CLI_SHOWUSAGE;
+	}
+
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused");
+
+	ast_mutex_lock(&message_type_statistics_lock);
+	for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) {
+		struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i);
+
+		if (!statistics->message_type) {
+			continue;
+		}
+
+		ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published,
+			statistics->unused);
+		published += statistics->published;
+		unused += statistics->unused;
+		++count;
+	}
+	ast_mutex_unlock(&message_type_statistics_lock);
+
+	ast_cli(a->fd, FMT_FIELDS, "Total", published, unused);
+	ast_cli(a->fd, "\n%d seen message types\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+
+	return CLI_SUCCESS;
+}
+
+static struct ast_cli_entry cli_stasis_statistics[] = {
+	AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"),
+	AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"),
+	AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"),
+	AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"),
+	AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"),
+};
+
+static int subscription_statistics_hash(const void *obj, const int flags)
+{
+	const struct stasis_subscription_statistics *object;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		object = obj;
+		key = object->uniqueid;
+		break;
+	default:
+		/* Hash can only work on something with a full key. */
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_case_hash(key);
+}
+
+static int subscription_statistics_cmp(void *obj, void *arg, int flags)
+{
+	const struct stasis_subscription_statistics *object_left = obj;
+	const struct stasis_subscription_statistics *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->uniqueid;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcasecmp(object_left->uniqueid, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		/* Not supported by container */
+		ast_assert(0);
+		cmp = -1;
+		break;
+	default:
+		/*
+		 * What arg points to is specific to this traversal callback
+		 * and has no special meaning to astobj2.
+		 */
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	/*
+	 * At this point the traversal callback is identical to a sorted
+	 * container.
+	 */
+	return CMP_MATCH;
+}
+
+static int topic_statistics_hash(const void *obj, const int flags)
+{
+	const struct stasis_topic_statistics *object;
+	const char *key;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_KEY:
+		key = obj;
+		break;
+	case OBJ_SEARCH_OBJECT:
+		object = obj;
+		key = object->name;
+		break;
+	default:
+		/* Hash can only work on something with a full key. */
+		ast_assert(0);
+		return 0;
+	}
+	return ast_str_case_hash(key);
+}
+
+static int topic_statistics_cmp(void *obj, void *arg, int flags)
+{
+	const struct stasis_topic_statistics *object_left = obj;
+	const struct stasis_topic_statistics *object_right = arg;
+	const char *right_key = arg;
+	int cmp;
+
+	switch (flags & OBJ_SEARCH_MASK) {
+	case OBJ_SEARCH_OBJECT:
+		right_key = object_right->name;
+		/* Fall through */
+	case OBJ_SEARCH_KEY:
+		cmp = strcasecmp(object_left->name, right_key);
+		break;
+	case OBJ_SEARCH_PARTIAL_KEY:
+		/* Not supported by container */
+		ast_assert(0);
+		cmp = -1;
+		break;
+	default:
+		/*
+		 * What arg points to is specific to this traversal callback
+		 * and has no special meaning to astobj2.
+		 */
+		cmp = 0;
+		break;
+	}
+	if (cmp) {
+		return 0;
+	}
+	/*
+	 * At this point the traversal callback is identical to a sorted
+	 * container.
+	 */
+	return CMP_MATCH;
+}
+#endif
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
+#ifdef AST_DEVMODE
+	ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics));
+	AST_VECTOR_FREE(&message_type_statistics);
+	ao2_cleanup(subscription_statistics);
+	ao2_cleanup(topic_statistics);
+#endif
 	ast_threadpool_shutdown(pool);
 	pool = NULL;
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
@@ -1902,5 +2641,28 @@
 		return -1;
 	}
 
+#ifdef AST_DEVMODE
+	/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
+	 * topic or subscripton.
+	 */
+	subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS,
+		subscription_statistics_hash, 0, subscription_statistics_cmp);
+	if (!subscription_statistics) {
+		return -1;
+	}
+
+	topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS,
+		topic_statistics_hash, 0, topic_statistics_cmp);
+	if (!topic_statistics) {
+		return -1;
+	}
+
+	AST_VECTOR_INIT(&message_type_statistics, 0);
+
+	if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) {
+		return -1;
+	}
+#endif
+
 	return 0;
 }
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index bc975fd..5aa04fb 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -971,7 +971,11 @@
 	}
 	ast_free(new_name);
 
+#ifdef AST_DEVMODE
+	caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
+#else
 	caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
+#endif
 	if (caching_topic->sub == NULL) {
 		ao2_ref(caching_topic, -1);
 
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 197f7f9..9a390ef 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -204,8 +204,14 @@
 	}
 }
 
+#ifdef AST_DEVMODE
+static struct stasis_message_router *stasis_message_router_create_internal(
+	struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
+	const char *func)
+#else
 static struct stasis_message_router *stasis_message_router_create_internal(
 	struct stasis_topic *topic, int use_thread_pool)
+#endif
 {
 	int res;
 	struct stasis_message_router *router;
@@ -224,11 +230,20 @@
 		return NULL;
 	}
 
+#ifdef AST_DEVMODE
+	if (use_thread_pool) {
+		router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
+	} else {
+		router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
+	}
+#else
 	if (use_thread_pool) {
 		router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
 	} else {
 		router->subscription = stasis_subscribe(topic, router_dispatch, router);
 	}
+#endif
+
 	if (!router->subscription) {
 		ao2_ref(router, -1);
 
@@ -241,17 +256,33 @@
 	return router;
 }
 
+#ifdef AST_DEVMODE
+struct stasis_message_router *__stasis_message_router_create(
+	struct stasis_topic *topic, const char *file, int lineno, const char *func)
+{
+	return stasis_message_router_create_internal(topic, 0, file, lineno, func);
+}
+#else
 struct stasis_message_router *stasis_message_router_create(
 	struct stasis_topic *topic)
 {
 	return stasis_message_router_create_internal(topic, 0);
 }
+#endif
 
+#ifdef AST_DEVMODE
+struct stasis_message_router *__stasis_message_router_create_pool(
+	struct stasis_topic *topic, const char *file, int lineno, const char *func)
+{
+	return stasis_message_router_create_internal(topic, 1, file, lineno, func);
+}
+#else
 struct stasis_message_router *stasis_message_router_create_pool(
 	struct stasis_topic *topic)
 {
 	return stasis_message_router_create_internal(topic, 1);
 }
+#endif
 
 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
 {

-- 
To view, visit https://gerrit.asterisk.org/10766
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I94411b53767f89ee01714daaecf0c2f1666e863f
Gerrit-Change-Number: 10766
Gerrit-PatchSet: 3
Gerrit-Owner: Joshua C. Colp <jcolp at digium.com>
Gerrit-Reviewer: Benjamin Keith Ford <bford at digium.com>
Gerrit-Reviewer: Friendly Automation (1000185)
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20181212/1eade175/attachment-0001.html>


More information about the asterisk-code-review mailing list