[Asterisk-code-review] stasis.c: Added topic_all container (...asterisk[master])

George Joseph asteriskteam at digium.com
Mon Apr 8 10:53:18 CDT 2019


George Joseph has submitted this change and it was merged. ( https://gerrit.asterisk.org/c/asterisk/+/10929 )

Change subject: stasis.c: Added topic_all container
......................................................................

stasis.c: Added topic_all container

Added topic_all container for centralizing the topic. This makes more
easier to managing the topics.

Added cli commands.
stasis show topics : It shows all registered topics.
stasis show topic <name> : It shows speicifed topic's detail info.

ASTERISK-28264

Change-Id: Ie86d125d2966f93de74ee00f47ae6fbc8c081c5f
---
M include/asterisk/stasis.h
M main/stasis.c
2 files changed, 350 insertions(+), 16 deletions(-)

Approvals:
  Joshua Colp: Looks good to me, but someone else must approve
  George Joseph: Looks good to me, approved; Approved for Submit



diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 0b229bf..8e9c6c7 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -520,15 +520,56 @@
 struct stasis_topic *stasis_topic_create(const char *name);
 
 /*!
+ * \brief Create a new topic with given detail.
+ * \param name Name of the new topic.
+ * \param detail Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event"
+ * \return New topic instance.
+ * \return \c NULL on error.
+ *
+ * \note There is no explicit ability to unsubscribe all subscribers
+ * from a topic and destroy it. As a result the topic can persist until
+ * the last subscriber unsubscribes itself even if there is no
+ * publisher.
+ */
+struct stasis_topic *stasis_topic_create_with_detail(
+		const char *name, const char *detail);
+
+/*!
+ * \brief Get a topic of the given name.
+ * \param name Topic's name.
+ * \return Name of the topic.
+ * \return \c NULL on error or not exist.
+ *
+ * \note This SHOULD NOT be used in normal operation for publishing messages.
+ */
+struct stasis_topic *stasis_topic_get(const char *name);
+
+/*!
+ * \brief Return the uniqueid of a topic.
+ * \param topic Topic.
+ * \return Uniqueid of the topic.
+ * \return \c NULL if topic is \c NULL.
+ */
+const char *stasis_topic_uniqueid(const struct stasis_topic *topic);
+
+/*!
  * \brief Return the name of a topic.
  * \param topic Topic.
  * \return Name of the topic.
  * \return \c NULL if topic is \c NULL.
- * \since 12
  */
 const char *stasis_topic_name(const struct stasis_topic *topic);
 
 /*!
+ * \brief Return the detail of a topic.
+ * \param topic Topic.
+ * \return Detail of the topic.
+ * \return \c NULL if topic is \c NULL.
+ * \since 12
+ */
+const char *stasis_topic_detail(const struct stasis_topic *topic);
+
+/*!
  * \brief Return the number of subscribers of a topic.
  * \param topic Topic.
  * \return Number of subscribers of the topic.
diff --git a/main/stasis.c b/main/stasis.c
index 7dd3893..4ce7052 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -41,9 +41,7 @@
 #include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_endpoints.h"
 #include "asterisk/config_options.h"
-#ifdef AST_DEVMODE
 #include "asterisk/cli.h"
-#endif
 
 /*** DOCUMENTATION
 	<managerEvent language="en_US" name="UserEvent">
@@ -307,6 +305,16 @@
 
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
+#if defined(LOW_MEMORY)
+
+#define TOPIC_ALL_BUCKETS 257
+
+#else
+
+#define TOPIC_ALL_BUCKETS 997
+
+#endif
+
 #ifdef AST_DEVMODE
 
 /*! The number of buckets to use for topic statistics */
@@ -372,9 +380,37 @@
 	int subscriber_id;
 
 	/*! Name of the topic */
-	char name[0];
+	char *name;
+
+	/*! Detail of the topic */
+	char *detail;
+
+	/*! Creation time */
+	struct timeval *creationtime;
 };
 
+struct ao2_container *topic_all;
+
+struct topic_proxy {
+	AO2_WEAKPROXY();
+
+	char *name;
+	char *detail;
+
+	struct timeval creationtime;
+
+	char buf[0];
+};
+
+AO2_STRING_FIELD_HASH_FN(topic_proxy, name);
+AO2_STRING_FIELD_CMP_FN(topic_proxy, name);
+AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name);
+
+static void proxy_dtor(void *weakproxy, void *data)
+{
+	ao2_unlink(topic_all, weakproxy);
+}
+
 /* Forward declarations for the tightly-coupled subscription object */
 static int topic_add_subscription(struct stasis_topic *topic,
 	struct stasis_subscription *sub);
@@ -394,6 +430,9 @@
 {
 	struct stasis_topic *topic = obj;
 
+	ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
+			topic->name, topic->detail);
+
 	/* Subscribers hold a reference to topics, so they should all be
 	 * unsubscribed before we get here. */
 	ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
@@ -442,40 +481,145 @@
 }
 #endif
 
-struct stasis_topic *stasis_topic_create(const char *name)
+static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
+{
+	struct topic_proxy *proxy;
+	struct stasis_topic* topic_tmp;
+
+	if (!topic || !name || !strlen(name) || !detail) {
+		return -1;
+	}
+
+	ao2_wrlock(topic_all);
+
+	topic_tmp = stasis_topic_get(name);
+	if (topic_tmp) {
+		ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
+		ao2_ref(topic_tmp, -1);
+		ao2_unlock(topic_all);
+
+		return -1;
+	}
+
+	proxy = ao2_t_weakproxy_alloc(
+			sizeof(*proxy) + strlen(name) + 1 + strlen(detail) + 1, NULL, topic->name);
+	if (!proxy) {
+		ao2_unlock(topic_all);
+
+		return -1;
+	}
+
+	/* set the proxy info */
+	proxy->name = proxy->buf;
+	proxy->detail = proxy->name + strlen(name) + 1;
+
+	strcpy(proxy->name, name); /* SAFE */
+	strcpy(proxy->detail, detail); /* SAFE */
+	proxy->creationtime = ast_tvnow();
+
+	/* We have exclusive access to proxy, no need for locking here. */
+	if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
+		ao2_cleanup(proxy);
+		ao2_unlock(topic_all);
+
+		return -1;
+	}
+
+	if (ao2_weakproxy_subscribe(proxy, proxy_dtor, NULL, OBJ_NOLOCK)) {
+		ao2_cleanup(proxy);
+		ao2_unlock(topic_all);
+
+		return -1;
+	}
+
+	/* setting the topic point to the proxy */
+	topic->name = proxy->name;
+	topic->detail = proxy->detail;
+	topic->creationtime = &(proxy->creationtime);
+
+	ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
+	ao2_ref(proxy, -1);
+
+	ao2_unlock(topic_all);
+
+	return 0;
+}
+
+struct stasis_topic *stasis_topic_create_with_detail(
+		const char *name, const char* detail
+		)
 {
 	struct stasis_topic *topic;
 	int res = 0;
 
-	topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
+	if (!name|| !strlen(name) || !detail) {
+		return NULL;
+	}
+	ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
+
+	topic = stasis_topic_get(name);
+	if (topic) {
+		ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
+				name, detail);
+		return topic;
+	}
+
+	topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
 	if (!topic) {
 		return NULL;
 	}
 
-	strcpy(topic->name, name); /* SAFE */
 	res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
 	res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
-	ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
-
-#ifdef AST_DEVMODE
-	topic->statistics = stasis_topic_statistics_create(topic);
-	if (!topic->name || !topic->statistics || res)
-#else
-	if (!topic->name || res)
-#endif
-	{
+	if (res) {
 		ao2_ref(topic, -1);
 		return NULL;
 	}
 
+	/* link to the proxy */
+	if (link_topic_proxy(topic, name, detail)) {
+		ao2_ref(topic, -1);
+		return NULL;
+	}
+
+#ifdef AST_DEVMODE
+	topic->statistics = stasis_topic_statistics_create(topic);
+	if (!topic->statistics) {
+		ao2_ref(topic, -1);
+		return NULL;
+	}
+#endif
+	ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
+
 	return topic;
 }
 
+struct stasis_topic *stasis_topic_create(const char *name)
+{
+	return stasis_topic_create_with_detail(name, "");
+}
+
+struct stasis_topic *stasis_topic_get(const char *name)
+{
+	return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
+}
+
 const char *stasis_topic_name(const struct stasis_topic *topic)
 {
+	if (!topic) {
+		return NULL;
+	}
 	return topic->name;
 }
 
+const char *stasis_topic_detail(const struct stasis_topic *topic)
+{
+	if (!topic) {
+		return NULL;
+	}
+	return topic->detail;
+}
+
 size_t stasis_topic_subscribers(const struct stasis_topic *topic)
 {
 	return AST_VECTOR_SIZE(&topic->subscribers);
@@ -2134,6 +2278,142 @@
 
 /*! @} */
 
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis show topics'
+ */
+static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct ao2_iterator iter;
+	struct topic_proxy *topic;
+	struct ao2_container *tmp_container;
+	int count = 0;
+#define FMT_HEADERS		"%-64s %-64s\n"
+#define FMT_FIELDS		"%-64s %-64s\n"
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis show topics";
+		e->usage =
+			"Usage: stasis show topics\n"
+			"	Shows a list of topics\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;
+	}
+
+	if (a->argc != e->args) {
+		return CLI_SHOWUSAGE;
+	}
+
+	ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
+
+	tmp_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
+				topic_proxy_sort_fn, NULL);
+
+	if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
+		ao2_cleanup(tmp_container);
+
+		return NULL;
+	}
+
+	/* getting all topic in order */
+	iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
+	while ((topic = ao2_iterator_next(&iter))) {
+		ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
+		ao2_ref(topic, -1);
+		++count;
+	}
+	ao2_iterator_destroy(&iter);
+	ao2_cleanup(tmp_container);
+
+	ast_cli(a->fd, "\n%d Total topics\n\n", count);
+
+#undef FMT_HEADERS
+#undef FMT_FIELDS
+
+	return CLI_SUCCESS;
+}
+
+/*!
+ * \internal
+ * \brief CLI tab completion for topic names
+ */
+static char *topic_complete_name(const char *word)
+{
+	struct topic_proxy *topic;
+	struct ao2_iterator it;
+	int wordlen = strlen(word);
+	int ret;
+
+	it = ao2_iterator_init(topic_all, 0);
+	while ((topic = ao2_iterator_next(&it))) {
+		if (!strncasecmp(word, topic->name, wordlen)) {
+			ret = ast_cli_completion_add(ast_strdup(topic->name));
+			if (ret) {
+				ao2_ref(topic, -1);
+				break;
+			}
+		}
+		ao2_ref(topic, -1);
+	}
+	ao2_iterator_destroy(&it);
+	return NULL;
+}
+
+/*!
+ * \internal
+ * \brief CLI command implementation for 'stasis show topic'
+ */
+static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	struct stasis_topic *topic;
+	char print_time[32];
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "stasis show topic";
+		e->usage =
+		    "Usage: stasis show topic <name>\n"
+		    "       Show stasis topic detail info.\n";
+		return NULL;
+	case CLI_GENERATE:
+		if (a->pos == 3) {
+			return topic_complete_name(a->word);
+		} else {
+			return NULL;
+		}
+	}
+
+	if (a->argc != 4) {
+		return CLI_SHOWUSAGE;
+	}
+
+	topic = stasis_topic_get(a->argv[3]);
+	if (!topic) {
+		ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
+		return CLI_FAILURE;
+	}
+
+	ast_cli(a->fd, "Name: %s\n", topic->name);
+	ast_cli(a->fd, "Detail: %s\n", topic->detail);
+	ast_cli(a->fd, "Subscribers count: %lu\n", AST_VECTOR_SIZE(&topic->subscribers));
+	ast_cli(a->fd, "Forwarding topic count: %lu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
+	ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
+	ast_cli(a->fd, "Duration time: %s\n", print_time);
+
+	ao2_ref(topic, -1);
+
+	return CLI_SUCCESS;
+}
+
+
+static struct ast_cli_entry cli_stasis[] = {
+	AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
+	AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
+};
+
+
 #ifdef AST_DEVMODE
 
 AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
@@ -2646,6 +2926,9 @@
 	ao2_cleanup(subscription_statistics);
 	ao2_cleanup(topic_statistics);
 #endif
+	ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
+	ao2_cleanup(topic_all);
+	topic_all = NULL;
 	ast_threadpool_shutdown(pool);
 	pool = NULL;
 	STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
@@ -2740,6 +3023,16 @@
 		return -1;
 	}
 
+	topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS,
+			topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
+	if (!topic_all) {
+		return -1;
+	}
+
+	if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
+		return -1;
+	}
+
 #ifdef AST_DEVMODE
 	/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
 	 * topic or subscripton.

-- 
To view, visit https://gerrit.asterisk.org/c/asterisk/+/10929
To unsubscribe, or for help writing mail filters, visit https://gerrit.asterisk.org/settings

Gerrit-Project: asterisk
Gerrit-Branch: master
Gerrit-Change-Id: Ie86d125d2966f93de74ee00f47ae6fbc8c081c5f
Gerrit-Change-Number: 10929
Gerrit-PatchSet: 21
Gerrit-Owner: sungtae kim <pchero21 at gmail.com>
Gerrit-Reviewer: Corey Farrell <git at cfware.com>
Gerrit-Reviewer: Friendly Automation
Gerrit-Reviewer: George Joseph <gjoseph at digium.com>
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: sungtae kim <pchero21 at gmail.com>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.digium.com/pipermail/asterisk-code-review/attachments/20190408/0bcc951c/attachment-0001.html>


More information about the asterisk-code-review mailing list