[Asterisk-code-review] stasis: Improve topic/subscription names and statistics. (...asterisk[master])

Joshua C. Colp asteriskteam at digium.com
Mon Mar 11 07:34:11 CDT 2019


Joshua C. Colp has uploaded this change for review. ( https://gerrit.asterisk.org/c/asterisk/+/11130


Change subject: stasis: Improve topic/subscription names and statistics.
......................................................................

stasis: Improve topic/subscription names and statistics.

Topic names now follow: <subsystem>:<functionality>[/<object>]

This ensures that they are all unique, and also provides better
insight in to what each topic is for.

Subscriber ids now also use the main topic name they are
subscribed to and an incrementing integer as their identifier to
make it easier to understand what the subscription is primarily
responsible for.

Both the CLI commands for listing topic and subscription statistics
now sort to make it a bit easier to see what is going on.

Subscriptions will now show all topics that they are receiving messages
from, not just the main topic they were subscribed to.

ASTERISK-28335

Change-Id: I484e971a38c3640f2bd156282e532eed84bf220d
---
M apps/app_voicemail.c
M include/asterisk/stasis.h
M main/app.c
M main/cdr.c
M main/cel.c
M main/channel_internal_api.c
M main/devicestate.c
M main/endpoints.c
M main/manager.c
M main/parking.c
M main/presencestate.c
M main/rtp_engine.c
M main/security_events.c
M main/stasis.c
M main/stasis_bridges.c
M main/stasis_cache.c
M main/stasis_cache_pattern.c
M main/stasis_channels.c
M main/stasis_endpoints.c
M main/stasis_system.c
M main/test.c
M res/res_corosync.c
M res/stasis/app.c
23 files changed, 283 insertions(+), 63 deletions(-)



  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/30/11130/1

diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c
index a5a3691..a151f0c 100644
--- a/apps/app_voicemail.c
+++ b/apps/app_voicemail.c
@@ -13334,6 +13334,7 @@
 static void mwi_sub_event_cb(struct stasis_subscription_change *change)
 {
 	struct mwi_sub_task *mwist;
+	const char *topic;
 	char *context;
 	char *mailbox;
 
@@ -13342,7 +13343,9 @@
 		return;
 	}
 
-	if (separate_mailbox(ast_strdupa(stasis_topic_name(change->topic)), &mailbox, &context)) {
+	/* The topic name is prefixed with "mwi:all/" as this is a pool topic */
+	topic = stasis_topic_name(change->topic) + 8;
+	if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) {
 		ast_free(mwist);
 		return;
 	}
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 09db9ea..0b229bf 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -514,6 +514,8 @@
  * from a topic and destroy it. As a result the topic can persist until
  * the last subscriber unsubscribes itself even if there is no
  * publisher.
+ *
+ * \note Topic names should be in the form of <subsystem>:<functionality>[/<object>]
  */
 struct stasis_topic *stasis_topic_create(const char *name);
 
diff --git a/main/app.c b/main/app.c
index ec74490..e8a4d2f 100644
--- a/main/app.c
+++ b/main/app.c
@@ -3337,7 +3337,7 @@
 		stasis_publish(mailbox_specific_topic, clear_msg);
 	}
 
-	stasis_topic_pool_delete_topic(mwi_topic_pool, stasis_topic_name(mailbox_specific_topic));
+	stasis_topic_pool_delete_topic(mwi_topic_pool, mwi_state->uniqueid);
 
 	ao2_cleanup(clear_msg);
 	return 0;
@@ -3430,7 +3430,7 @@
 	if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) {
 		return -1;
 	}
-	mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+	mwi_topic_all = stasis_topic_create("mwi:all");
 	if (!mwi_topic_all) {
 		return -1;
 	}
@@ -3446,7 +3446,7 @@
 	if (!mwi_topic_pool) {
 		return -1;
 	}
-	queue_topic_all = stasis_topic_create("stasis_queue_topic");
+	queue_topic_all = stasis_topic_create("queue:all");
 	if (!queue_topic_all) {
 		return -1;
 	}
diff --git a/main/cdr.c b/main/cdr.c
index 53f3362..f8f038c 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -4504,7 +4504,7 @@
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
-	cdr_topic = stasis_topic_create("cdr_engine");
+	cdr_topic = stasis_topic_create("cdr:aggregator");
 	if (!cdr_topic) {
 		return AST_MODULE_LOAD_FAILURE;
 	}
diff --git a/main/cel.c b/main/cel.c
index 95376db..1e77d25 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -1431,12 +1431,12 @@
  */
 static int create_subscriptions(void)
 {
-	cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic");
+	cel_aggregation_topic = stasis_topic_create("cel:aggregator");
 	if (!cel_aggregation_topic) {
 		return -1;
 	}
 
-	cel_topic = stasis_topic_create("cel_topic");
+	cel_topic = stasis_topic_create("cel:misc");
 	if (!cel_topic) {
 		return -1;
 	}
diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c
index 22a2bb6..be8fd7c 100644
--- a/main/channel_internal_api.c
+++ b/main/channel_internal_api.c
@@ -1520,14 +1520,23 @@
 
 int ast_channel_internal_setup_topics(struct ast_channel *chan)
 {
-	const char *topic_name = chan->uniqueid.unique_id;
+	char *topic_name;
+	int ret;
 	ast_assert(chan->topic == NULL);
 
-	if (ast_strlen_zero(topic_name)) {
-		topic_name = "<dummy-channel>";
+	if (ast_strlen_zero(chan->uniqueid.unique_id)) {
+		static int dummy_id;
+		ret = ast_asprintf(&topic_name, "channel:dummy-%d", ast_atomic_fetchadd_int(&dummy_id, +1));
+	} else {
+		ret = ast_asprintf(&topic_name, "channel:%s", chan->uniqueid.unique_id);
+	}
+
+	if (ret < 0) {
+		return -1;
 	}
 
 	chan->topic = stasis_topic_create(topic_name);
+	ast_free(topic_name);
 	if (!chan->topic) {
 		return -1;
 	}
diff --git a/main/devicestate.c b/main/devicestate.c
index b6c740c..ecf255f 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -902,7 +902,7 @@
 	if (STASIS_MESSAGE_TYPE_INIT(ast_device_state_message_type) != 0) {
 		return -1;
 	}
-	device_state_topic_all = stasis_topic_create("ast_device_state_topic");
+	device_state_topic_all = stasis_topic_create("devicestate:all");
 	if (!device_state_topic_all) {
 		return -1;
 	}
diff --git a/main/endpoints.c b/main/endpoints.c
index b958932..c53e31d 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -255,9 +255,17 @@
 	}
 
 	if (!ast_strlen_zero(resource)) {
+		char *topic_name;
+		int ret;
+
+		ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+		if (ret < 0) {
+			return NULL;
+		}
 
 		endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
-			endpoint->id);
+			topic_name);
+		ast_free(topic_name);
 		if (!endpoint->topics) {
 			return NULL;
 		}
@@ -284,8 +292,17 @@
 		endpoint_publish_snapshot(endpoint);
 		ao2_link(endpoints, endpoint);
 	} else {
+		char *topic_name;
+		int ret;
+
+		ret = ast_asprintf(&topic_name, "endpoint:%s", endpoint->id);
+		if (ret < 0) {
+			return NULL;
+		}
+
 		endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
-			endpoint->id);
+			topic_name);
+		ast_free(topic_name);
 		if (!endpoint->topics) {
 			return NULL;
 		}
diff --git a/main/manager.c b/main/manager.c
index 0c715e4..8e7a8b2 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -8996,7 +8996,7 @@
 		if (res != 0) {
 			return -1;
 		}
-		manager_topic = stasis_topic_create("manager_topic");
+		manager_topic = stasis_topic_create("manager:core");
 		if (!manager_topic) {
 			return -1;
 		}
diff --git a/main/parking.c b/main/parking.c
index bf0d0b6..d77a767 100644
--- a/main/parking.c
+++ b/main/parking.c
@@ -56,7 +56,7 @@
 		return -1;
 	}
 
-	parking_topic = stasis_topic_create("ast_parking");
+	parking_topic = stasis_topic_create("parking:all");
 	if (!parking_topic) {
 		return -1;
 	}
diff --git a/main/presencestate.c b/main/presencestate.c
index 65b7f69..45433b1 100644
--- a/main/presencestate.c
+++ b/main/presencestate.c
@@ -500,7 +500,7 @@
 		return -1;
 	}
 
-	presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all");
+	presence_state_topic_all = stasis_topic_create("presence_state:all");
 	if (!presence_state_topic_all) {
 		return -1;
 	}
diff --git a/main/rtp_engine.c b/main/rtp_engine.c
index fd1613c..403b663 100644
--- a/main/rtp_engine.c
+++ b/main/rtp_engine.c
@@ -3539,7 +3539,7 @@
 	ast_rwlock_init(&mime_types_lock);
 	ast_rwlock_init(&static_RTP_PT_lock);
 
-	rtp_topic = stasis_topic_create("rtp_topic");
+	rtp_topic = stasis_topic_create("rtp:all");
 	if (!rtp_topic) {
 		return -1;
 	}
diff --git a/main/security_events.c b/main/security_events.c
index 37dce02..0328eca 100644
--- a/main/security_events.c
+++ b/main/security_events.c
@@ -484,7 +484,7 @@
 {
 	ast_register_cleanup(security_stasis_cleanup);
 
-	security_topic = stasis_topic_create("ast_security");
+	security_topic = stasis_topic_create("security:all");
 	if (!security_topic) {
 		return -1;
 	}
diff --git a/main/stasis.c b/main/stasis.c
index fa92eeb..2e75e50 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -349,6 +349,8 @@
 	int messages_dispatched;
 	/*! \brief The ids of the subscribers to this topic */
 	struct ao2_container *subscribers;
+	/*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
+	struct stasis_topic *topic;
 	/*! \brief Name of the topic */
 	char name[0];
 };
@@ -366,6 +368,9 @@
 	struct stasis_topic_statistics *statistics;
 #endif
 
+	/*! Unique incrementing integer for subscriber ids */
+	int subscriber_id;
+
 	/*! Name of the topic */
 	char name[0];
 };
@@ -412,11 +417,11 @@
 	ao2_cleanup(statistics->subscribers);
 }
 
-static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name)
+static struct stasis_topic_statistics *stasis_topic_statistics_create(struct stasis_topic *topic)
 {
 	struct stasis_topic_statistics *statistics;
 
-	statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, topic_statistics_destroy);
+	statistics = ao2_alloc(sizeof(*statistics) + strlen(topic->name) + 1, topic_statistics_destroy);
 	if (!statistics) {
 		return NULL;
 	}
@@ -427,7 +432,9 @@
 		return NULL;
 	}
 
-	strcpy(statistics->name, name); /* SAFE */
+	/* This is strictly used for the pointer address when showing the topic */
+	statistics->topic = topic;
+	strcpy(statistics->name, topic->name); /* SAFE */
 	ao2_link(topic_statistics, statistics);
 
 	return statistics;
@@ -448,7 +455,7 @@
 	res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
 	res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
 #ifdef AST_DEVMODE
-	topic->statistics = stasis_topic_statistics_create(name);
+	topic->statistics = stasis_topic_statistics_create(topic);
 	if (!topic->name || !topic->statistics || res)
 #else
 	if (!topic->name || res)
@@ -477,8 +484,8 @@
 	const char *file;
 	/*! \brief The function where the subscription originates */
 	const char *func;
-	/*! \brief Name of the topic we subscribed to */
-	char *topic;
+	/*! \brief Names of the topics we are subscribed to */
+	struct ao2_container *topics;
 	/*! \brief The message type that currently took the longest to process */
 	struct stasis_message_type *highest_time_message_type;
 	/*! \brief Highest time spent invoking a message */
@@ -495,6 +502,8 @@
 	int uses_threadpool;
 	/*! \brief The line number where the subscription originates */
 	int lineno;
+	/*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
+	struct stasis_subscription *sub;
 	/*! \brief Unique ID of the subscription */
 	char uniqueid[0];
 };
@@ -503,7 +512,7 @@
 /*! \internal */
 struct stasis_subscription {
 	/*! Unique ID for this subscription */
-	char uniqueid[AST_UUID_STR_LEN];
+	char *uniqueid;
 	/*! Topic subscribed to. */
 	struct stasis_topic *topic;
 	/*! Mailbox for processing incoming messages. */
@@ -546,6 +555,7 @@
 	 * be bad. */
 	ast_assert(stasis_subscription_is_done(sub));
 
+	ast_free(sub->uniqueid);
 	ao2_cleanup(sub->topic);
 	sub->topic = NULL;
 	ast_taskprocessor_unreference(sub->mailbox);
@@ -628,26 +638,37 @@
 }
 
 #ifdef AST_DEVMODE
-static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid,
-	const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno,
+static void subscription_statistics_destroy(void *obj)
+{
+	struct stasis_subscription_statistics *statistics = obj;
+
+	ao2_cleanup(statistics->topics);
+}
+
+static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
+	int needs_mailbox, int use_thread_pool, const char *file, int lineno,
 	const char *func)
 {
 	struct stasis_subscription_statistics *statistics;
-	size_t uniqueid_len = strlen(uniqueid) + 1;
 
-	statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL);
+	statistics = ao2_alloc(sizeof(*statistics) + strlen(sub->uniqueid) + 1, subscription_statistics_destroy);
 	if (!statistics) {
 		return NULL;
 	}
 
+	statistics->topics = ast_str_container_alloc(1);
+	if (!statistics->topics) {
+		ao2_ref(statistics, -1);
+		return NULL;
+	}
+
 	statistics->file = file;
 	statistics->lineno = lineno;
 	statistics->func = func;
 	statistics->uses_mailbox = needs_mailbox;
 	statistics->uses_threadpool = use_thread_pool;
-	strcpy(statistics->uniqueid, uniqueid); /* SAFE */
-	statistics->topic = statistics->uniqueid + uniqueid_len;
-	strcpy(statistics->topic, topic); /* SAFE */
+	strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
+	statistics->sub = sub;
 	ao2_link(subscription_statistics, statistics);
 
 	return statistics;
@@ -665,6 +686,7 @@
 	const char *func)
 {
 	struct stasis_subscription *sub;
+	int ret;
 
 	if (!topic) {
 		return NULL;
@@ -675,12 +697,17 @@
 	if (!sub) {
 		return NULL;
 	}
-	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 #ifdef AST_DEVMODE
-	sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox,
-		use_thread_pool, file, lineno, func);
-	if (!sub->statistics) {
+	ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+	sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
+	if (ret < 0 || !sub->statistics) {
+		ao2_ref(sub, -1);
+		return NULL;
+	}
+#else
+	ret = ast_asprintf(&sub->uniqueid, "%s-%d", stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
+	if (ret < 0) {
 		ao2_ref(sub, -1);
 		return NULL;
 	}
@@ -1012,6 +1039,7 @@
 
 #ifdef AST_DEVMODE
 	ast_str_container_add(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+	ast_str_container_add(sub->statistics->topics, stasis_topic_name(topic));
 #endif
 
 	ao2_unlock(topic);
@@ -1035,6 +1063,7 @@
 #ifdef AST_DEVMODE
 	if (!res) {
 		ast_str_container_remove(topic->statistics->subscribers, stasis_subscription_uniqueid(sub));
+		ast_str_container_remove(sub->statistics->topics, stasis_topic_name(topic));
 	}
 #endif
 
@@ -1498,6 +1527,7 @@
 struct topic_pool_entry {
 	struct stasis_forward *forward;
 	struct stasis_topic *topic;
+	char name[0];
 };
 
 static void topic_pool_entry_dtor(void *obj)
@@ -1509,10 +1539,19 @@
 	entry->topic = NULL;
 }
 
-static struct topic_pool_entry *topic_pool_entry_alloc(void)
+static struct topic_pool_entry *topic_pool_entry_alloc(const char *topic_name)
 {
-	return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
-		AO2_ALLOC_OPT_LOCK_NOLOCK);
+	struct topic_pool_entry *topic_pool_entry;
+
+	topic_pool_entry = ao2_alloc_options(sizeof(*topic_pool_entry) + strlen(topic_name) + 1,
+		topic_pool_entry_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
+	if (!topic_pool_entry) {
+		return NULL;
+	}
+
+	strcpy(topic_pool_entry->name, topic_name); /* Safe */
+
+	return topic_pool_entry;
 }
 
 struct stasis_topic_pool {
@@ -1550,7 +1589,7 @@
 		break;
 	case OBJ_SEARCH_OBJECT:
 		object = obj;
-		key = stasis_topic_name(object->topic);
+		key = object->name;
 		break;
 	default:
 		/* Hash can only work on something with a full key. */
@@ -1569,10 +1608,10 @@
 
 	switch (flags & OBJ_SEARCH_MASK) {
 	case OBJ_SEARCH_OBJECT:
-		right_key = stasis_topic_name(object_right->topic);
+		right_key = object_right->name;
 		/* Fall through */
 	case OBJ_SEARCH_KEY:
-		cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+		cmp = strcasecmp(object_left->name, right_key);
 		break;
 	case OBJ_SEARCH_PARTIAL_KEY:
 		/* Not supported by container */
@@ -1649,18 +1688,29 @@
 {
 	RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
 	SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
+	char *new_topic_name;
+	int ret;
 
 	topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
 	if (topic_pool_entry) {
 		return topic_pool_entry->topic;
 	}
 
-	topic_pool_entry = topic_pool_entry_alloc();
+	topic_pool_entry = topic_pool_entry_alloc(topic_name);
 	if (!topic_pool_entry) {
 		return NULL;
 	}
 
-	topic_pool_entry->topic = stasis_topic_create(topic_name);
+	/* To provide further detail and to ensure that the topic is unique within the scope of the
+	 * system we prefix it with the pooling topic name, which should itself already be unique.
+	 */
+	ret = ast_asprintf(&new_topic_name, "%s/%s", stasis_topic_name(pool->pool_topic), topic_name);
+	if (ret < 0) {
+		return NULL;
+	}
+
+	topic_pool_entry->topic = stasis_topic_create(new_topic_name);
+	ast_free(new_topic_name);
 	if (!topic_pool_entry->topic) {
 		return NULL;
 	}
@@ -2084,10 +2134,48 @@
 
 /*!
  * \internal
+ * \brief Subscription statistics ao2 container sort function.
+ *
+ * \param obj_left pointer to the (user-defined part) of an object.
+ * \param obj_right pointer to the (user-defined part) of an object.
+ * \param flags flags from ao2_callback()
+ *   OBJ_POINTER - if set, 'obj_right', is an object.
+ *   OBJ_KEY - if set, 'obj_right', is a search key item that is not an object.
+ *   OBJ_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
+ *
+ * \retval <0 if obj_left < obj_right
+ * \retval =0 if obj_left == obj_right
+ * \retval >0 if obj_left > obj_right
+ */
+static int statistics_subscription_sort_cmp(const void *obj_left, const void *obj_right, int flags)
+{
+	const struct stasis_subscription_statistics *statistics_left = obj_left;
+	const struct stasis_subscription_statistics *statistics_right = obj_right;
+	const char *right_key = obj_right;
+	int cmp;
+
+	switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+	default:
+	case OBJ_POINTER:
+		right_key = statistics_right->uniqueid;
+	/* Fall through */
+	case OBJ_KEY:
+		cmp = strcmp(statistics_left->uniqueid, right_key);
+		break;
+	case OBJ_PARTIAL_KEY:
+		cmp = strncmp(statistics_left->uniqueid, right_key, strlen(right_key));
+		break;
+	}
+	return cmp;
+}
+
+/*!
+ * \internal
  * \brief CLI command implementation for 'stasis statistics show subscriptions'
  */
 static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+	struct ao2_container *sorted_subscriptions;
 	struct ao2_iterator iter;
 	struct stasis_subscription_statistics *statistics;
 	int count = 0;
@@ -2112,9 +2200,22 @@
 		return CLI_SHOWUSAGE;
 	}
 
+	sorted_subscriptions = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+		statistics_subscription_sort_cmp, NULL);
+	if (!sorted_subscriptions) {
+		ast_cli(a->fd, "Could not create container for sorting subscription statistics\n");
+		return CLI_SUCCESS;
+	}
+
+	if (ao2_container_dup(sorted_subscriptions, subscription_statistics, 0)) {
+		ao2_ref(sorted_subscriptions, -1);
+		ast_cli(a->fd, "Could not sort subscription statistics\n");
+		return CLI_SUCCESS;
+	}
+
 	ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke");
 
-	iter = ao2_iterator_init(subscription_statistics, 0);
+	iter = ao2_iterator_init(sorted_subscriptions, 0);
 	while ((statistics = ao2_iterator_next(&iter))) {
 		ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed,
 			statistics->lowest_time_invoked, statistics->highest_time_invoked);
@@ -2125,6 +2226,8 @@
 	}
 	ao2_iterator_destroy(&iter);
 
+	ao2_ref(sorted_subscriptions, -1);
+
 	ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed);
 	ast_cli(a->fd, "\n%d subscriptions\n\n", count);
 
@@ -2169,6 +2272,8 @@
 static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	struct stasis_subscription_statistics *statistics;
+	struct ao2_iterator i;
+	char *name;
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -2196,7 +2301,7 @@
 	}
 
 	ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid);
-	ast_cli(a->fd, "Topic: %s\n", statistics->topic);
+	ast_cli(a->fd, "Pointer Address: %p\n", statistics->sub);
 	ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>"));
 	ast_cli(a->fd, "Source line number: %d\n", statistics->lineno);
 	ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>"));
@@ -2213,6 +2318,16 @@
 	}
 	ao2_unlock(statistics);
 
+	ast_cli(a->fd, "Number of topics: %d\n", ao2_container_count(statistics->topics));
+
+	ast_cli(a->fd, "Subscribed topics:\n");
+	i = ao2_iterator_init(statistics->topics, 0);
+	while ((name = ao2_iterator_next(&i))) {
+		ast_cli(a->fd, "\t%s\n", name);
+		ao2_ref(name, -1);
+	}
+	ao2_iterator_destroy(&i);
+
 	ao2_ref(statistics, -1);
 
 	return CLI_SUCCESS;
@@ -2220,18 +2335,56 @@
 
 /*!
  * \internal
+ * \brief Topic ao2 container sort function.
+ *
+ * \param obj_left pointer to the (user-defined part) of an object.
+ * \param obj_right pointer to the (user-defined part) of an object.
+ * \param flags flags from ao2_callback()
+ *   OBJ_POINTER - if set, 'obj_right', is an object.
+ *   OBJ_KEY - if set, 'obj_right', is a search key item that is not an object.
+ *   OBJ_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
+ *
+ * \retval <0 if obj_left < obj_right
+ * \retval =0 if obj_left == obj_right
+ * \retval >0 if obj_left > obj_right
+ */
+static int statistics_topic_sort_cmp(const void *obj_left, const void *obj_right, int flags)
+{
+	const struct stasis_topic_statistics *topic_left = obj_left;
+	const struct stasis_topic_statistics *topic_right = obj_right;
+	const char *right_key = obj_right;
+	int cmp;
+
+	switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+	default:
+	case OBJ_POINTER:
+		right_key = topic_right->name;
+		/* Fall through */
+	case OBJ_KEY:
+		cmp = strcmp(topic_left->name, right_key);
+		break;
+	case OBJ_PARTIAL_KEY:
+		cmp = strncmp(topic_left->name, right_key, strlen(right_key));
+		break;
+	}
+	return cmp;
+}
+
+/*!
+ * \internal
  * \brief CLI command implementation for 'stasis statistics show topics'
  */
 static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
+	struct ao2_container *sorted_topics;
 	struct ao2_iterator iter;
 	struct stasis_topic_statistics *statistics;
 	int count = 0;
 	int not_dispatched = 0;
 	int dispatched = 0;
-#define FMT_HEADERS		"%-64s %10s %10s %16s %16s\n"
-#define FMT_FIELDS		"%-64s %10d %10d %16ld %16ld\n"
-#define FMT_FIELDS2		"%-64s %10d %10d\n"
+#define FMT_HEADERS		"%-64s %10s %10s %10s %16s %16s\n"
+#define FMT_FIELDS		"%-64s %10d %10d %10d %16ld %16ld\n"
+#define FMT_FIELDS2		"%-64s %10s %10d %10d\n"
 
 	switch (cmd) {
 	case CLI_INIT:
@@ -2248,11 +2401,25 @@
 		return CLI_SHOWUSAGE;
 	}
 
-	ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+	sorted_topics = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+		statistics_topic_sort_cmp, NULL);
+	if (!sorted_topics) {
+		ast_cli(a->fd, "Could not create container for sorting topic statistics\n");
+		return CLI_SUCCESS;
+	}
 
-	iter = ao2_iterator_init(topic_statistics, 0);
+	if (ao2_container_dup(sorted_topics, topic_statistics, 0)) {
+		ao2_ref(sorted_topics, -1);
+		ast_cli(a->fd, "Could not sort topic statistics\n");
+		return CLI_SUCCESS;
+	}
+
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Subscribers", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch");
+
+	iter = ao2_iterator_init(sorted_topics, 0);
 	while ((statistics = ao2_iterator_next(&iter))) {
-		ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched,
+		ast_cli(a->fd, FMT_FIELDS, statistics->name, ao2_container_count(statistics->subscribers),
+			statistics->messages_not_dispatched, statistics->messages_dispatched,
 			statistics->lowest_time_dispatched, statistics->highest_time_dispatched);
 		not_dispatched += statistics->messages_not_dispatched;
 		dispatched += statistics->messages_dispatched;
@@ -2261,7 +2428,9 @@
 	}
 	ao2_iterator_destroy(&iter);
 
-	ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched);
+	ao2_ref(sorted_topics, -1);
+
+	ast_cli(a->fd, FMT_FIELDS2, "Total", "", not_dispatched, dispatched);
 	ast_cli(a->fd, "\n%d topics\n\n", count);
 
 #undef FMT_HEADERS
@@ -2334,6 +2503,7 @@
 	}
 
 	ast_cli(a->fd, "Topic: %s\n", statistics->name);
+	ast_cli(a->fd, "Pointer Address: %p\n", statistics->topic);
 	ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched);
 	ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched);
 	ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched);
diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c
index cfdf117..31d3eac 100644
--- a/main/stasis_bridges.c
+++ b/main/stasis_bridges.c
@@ -294,12 +294,21 @@
 
 int bridge_topics_init(struct ast_bridge *bridge)
 {
+	char *topic_name;
+	int ret;
+
 	if (ast_strlen_zero(bridge->uniqueid)) {
 		ast_log(LOG_ERROR, "Bridge id initialization required\n");
 		return -1;
 	}
 
-	bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
+	ret = ast_asprintf(&topic_name, "bridge:%s", bridge->uniqueid);
+	if (ret < 0) {
+		return -1;
+	}
+
+	bridge->topic = stasis_topic_pool_get_topic(bridge_topic_pool, topic_name);
+	ast_free(topic_name);
 	if (!bridge->topic) {
 		return -1;
 	}
@@ -1365,7 +1374,7 @@
 
 	ast_register_cleanup(stasis_bridging_cleanup);
 
-	bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
+	bridge_topic_all = stasis_topic_create("bridge:all");
 	if (!bridge_topic_all) {
 		return -1;
 	}
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index ee8a1dd..6be4bf1 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -948,10 +948,11 @@
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
 {
 	struct stasis_caching_topic *caching_topic;
+	static int caching_id;
 	char *new_name;
 	int ret;
 
-	ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
+	ret = ast_asprintf(&new_name, "cache:%d/%s", ast_atomic_fetchadd_int(&caching_id, +1), stasis_topic_name(original_topic));
 	if (ret < 0) {
 		return NULL;
 	}
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index 04d8164..463be69 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -67,13 +67,14 @@
 {
 	char *cached_name = NULL;
 	struct stasis_cp_all *all;
+	static int cache_id;
 
 	all = ao2_t_alloc(sizeof(*all), all_dtor, name);
 	if (!all) {
 		return NULL;
 	}
 
-	ast_asprintf(&cached_name, "%s-cached", name);
+	ast_asprintf(&cached_name, "cache_pattern:%d/%s", ast_atomic_fetchadd_int(&cache_id, +1), name);
 	if (!cached_name) {
 		ao2_ref(all, -1);
 
diff --git a/main/stasis_channels.c b/main/stasis_channels.c
index d39fb08..e8842c1 100644
--- a/main/stasis_channels.c
+++ b/main/stasis_channels.c
@@ -1658,7 +1658,7 @@
 
 	ast_register_cleanup(stasis_channels_cleanup);
 
-	channel_topic_all = stasis_topic_create("ast_channel_topic_all");
+	channel_topic_all = stasis_topic_create("channel:all");
 	if (!channel_topic_all) {
 		return -1;
 	}
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
index b3a837b..289a90e 100644
--- a/main/stasis_endpoints.c
+++ b/main/stasis_endpoints.c
@@ -460,7 +460,7 @@
 	int res = 0;
 	ast_register_cleanup(endpoints_stasis_cleanup);
 
-	endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
+	endpoint_cache_all = stasis_cp_all_create("endpoint:all",
 		endpoint_snapshot_get_id);
 	if (!endpoint_cache_all) {
 		return -1;
diff --git a/main/stasis_system.c b/main/stasis_system.c
index 961a2b0..4c84f57 100644
--- a/main/stasis_system.c
+++ b/main/stasis_system.c
@@ -374,7 +374,7 @@
 {
 	ast_register_cleanup(stasis_system_cleanup);
 
-	system_topic = stasis_topic_create("ast_system");
+	system_topic = stasis_topic_create("system:all");
 	if (!system_topic) {
 		return 1;
 	}
diff --git a/main/test.c b/main/test.c
index 2abe698..32df829 100644
--- a/main/test.c
+++ b/main/test.c
@@ -1224,7 +1224,7 @@
 	ast_register_cleanup(test_cleanup);
 
 	/* Create stasis topic */
-	test_suite_topic = stasis_topic_create("test_suite_topic");
+	test_suite_topic = stasis_topic_create("testsuite:all");
 	if (!test_suite_topic) {
 		return -1;
 	}
diff --git a/res/res_corosync.c b/res/res_corosync.c
index bf172e3..6e66c4f 100644
--- a/res/res_corosync.c
+++ b/res/res_corosync.c
@@ -1131,7 +1131,7 @@
 		goto failed;
 	}
 
-	corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
+	corosync_aggregate_topic = stasis_topic_create("corosync:aggregator");
 	if (!corosync_aggregate_topic) {
 		ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
 		goto failed;
diff --git a/res/stasis/app.c b/res/stasis/app.c
index 0f0923a..a69ca55 100644
--- a/res/stasis/app.c
+++ b/res/stasis/app.c
@@ -919,6 +919,8 @@
 	int res = 0;
 	size_t context_size = strlen("stasis-") + strlen(name) + 1;
 	char context_name[context_size];
+	char *topic_name;
+	int ret;
 
 	ast_assert(name != NULL);
 	ast_assert(handler != NULL);
@@ -939,7 +941,13 @@
 		return NULL;
 	}
 
-	app->topic = stasis_topic_create(name);
+	ret = ast_asprintf(&topic_name, "ari:application/%s", name);
+	if (ret < 0) {
+		return NULL;
+	}
+
+	app->topic = stasis_topic_create(topic_name);
+	ast_free(topic_name);
 	if (!app->topic) {
 		return NULL;
 	}

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/11130
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Change-Id: I484e971a38c3640f2bd156282e532eed84bf220d
Gerrit-Change-Number: 11130
Gerrit-PatchSet: 1
Gerrit-Owner: Joshua C. Colp <jcolp at digium.com>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190311/c00c9b32/attachment-0001.html>


More information about the asterisk-code-review mailing list