[asterisk-commits] dlee: branch dlee/taskprocessor-optimization r399651 - in /team/dlee/taskproc...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Sep 23 23:31:27 CDT 2013


Author: dlee
Date: Mon Sep 23 23:31:25 2013
New Revision: 399651

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399651
Log:
Merged revisions 399640-399641,399644-399650 from http://svn.asterisk.org/svn/asterisk/team/dlee/performance

Added:
    team/dlee/taskprocessor-optimization/configs/stasis.conf.sample
      - copied unchanged from r399650, team/dlee/performance/configs/stasis.conf.sample
    team/dlee/taskprocessor-optimization/main/stasis_config.c
      - copied unchanged from r399650, team/dlee/performance/main/stasis_config.c
Modified:
    team/dlee/taskprocessor-optimization/   (props changed)
    team/dlee/taskprocessor-optimization/include/asterisk/stasis.h
    team/dlee/taskprocessor-optimization/include/asterisk/stasis_message_router.h
    team/dlee/taskprocessor-optimization/main/stasis.c
    team/dlee/taskprocessor-optimization/main/stasis_message_router.c
    team/dlee/taskprocessor-optimization/main/taskprocessor.c
    team/dlee/taskprocessor-optimization/tests/test_stasis.c

Propchange: team/dlee/taskprocessor-optimization/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Sep 23 23:31:25 2013
@@ -1,1 +1,1 @@
-/team/dlee/performance:1-399637
+/team/dlee/performance:1-399650

Modified: team/dlee/taskprocessor-optimization/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/include/asterisk/stasis.h?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/include/asterisk/stasis.h (original)
+++ team/dlee/taskprocessor-optimization/include/asterisk/stasis.h Mon Sep 23 23:31:25 2013
@@ -884,6 +884,16 @@
  */
 int stasis_wait_init(void);
 
+struct ast_threadpool_options;
+
+/*!
+ * \internal
+ * \brief Retrieves the Stasis threadpool configuration.
+ * \param[out] threadpool_options Filled with Stasis threadpool options.
+ */
+void stasis_config_get_threadpool_options(
+	struct ast_threadpool_options *threadpool_options);
+
 /*! @} */
 
 /*!

Modified: team/dlee/taskprocessor-optimization/include/asterisk/stasis_message_router.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/include/asterisk/stasis_message_router.h?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/include/asterisk/stasis_message_router.h (original)
+++ team/dlee/taskprocessor-optimization/include/asterisk/stasis_message_router.h Mon Sep 23 23:31:25 2013
@@ -100,6 +100,9 @@
  * updates for types not handled by routes added with
  * stasis_message_router_add_cache_update().
  *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
+ *
  * \param router Router to add the route to.
  * \param message_type Type of message to route.
  * \param callback Callback to forard messages of \a message_type to.
@@ -120,6 +123,9 @@
  * A particular \a message_type may have at most one cache route per \a router.
  * These are distinct from regular routes, so one could have both a regular
  * route and a cache route for the same \a message_type.
+ *
+ * Adding multiple routes for the same message type results in undefined
+ * behavior.
  *
  * \param router Router to add the route to.
  * \param message_type Subtype of cache update to route.

Modified: team/dlee/taskprocessor-optimization/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/main/stasis.c?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/main/stasis.c (original)
+++ team/dlee/taskprocessor-optimization/main/stasis.c Mon Sep 23 23:31:25 2013
@@ -34,6 +34,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
+#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
@@ -133,6 +134,9 @@
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
+/*! Threadpool for dispatching notifications to subscribers */
+static struct ast_threadpool *pool;
+
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -282,15 +286,7 @@
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 	if (needs_mailbox) {
-		/* With a small number of subscribers, a thread-per-sub is
-		 * acceptable. If our usage changes so that we have larger
-		 * numbers of subscribers, we'll probably want to consider
-		 * a threadpool. We had that originally, but with so few
-		 * subscribers it was actually a performance loss instead of
-		 * a gain.
-		 */
-		sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
-			TPS_REF_DEFAULT);
+		sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
 		if (!sub->mailbox) {
 			return NULL;
 		}
@@ -735,6 +731,13 @@
 	ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
 }
 
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+	ast_threadpool_shutdown(pool);
+	pool = NULL;
+}
+
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
@@ -745,14 +748,36 @@
 {
 	int cache_init;
 
+	struct ast_threadpool_options opts;
+
 	/* Be sure the types are cleaned up after the message bus */
 	ast_register_cleanup(stasis_cleanup);
+	ast_register_atexit(stasis_exit);
+
+	if (stasis_config_init() != 0) {
+		ast_log(LOG_ERROR, "Stasis configuration failed\n");
+		return -1;
+	}
 
 	if (stasis_wait_init() != 0) {
 		ast_log(LOG_ERROR, "Stasis initialization failed\n");
 		return -1;
 	}
 
+	if (pool) {
+		ast_log(LOG_ERROR, "Stasis double-initialized\n");
+		return -1;
+	}
+
+	stasis_config_get_threadpool_options(&opts);
+	ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
+		opts.initial_size, opts.max_size, opts.idle_timeout);
+	pool = ast_threadpool_create("stasis-core", NULL, &opts);
+	if (!pool) {
+		ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
+		return -1;
+	}
+
 	cache_init = stasis_cache_init();
 	if (cache_init != 0) {
 		return -1;

Modified: team/dlee/taskprocessor-optimization/main/stasis_message_router.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/main/stasis_message_router.c?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/main/stasis_message_router.c (original)
+++ team/dlee/taskprocessor-optimization/main/stasis_message_router.c Mon Sep 23 23:31:25 2013
@@ -53,12 +53,28 @@
 	struct stasis_message_route routes[];
 };
 
+static int table_has_route(struct route_table *table,
+	struct stasis_message_type *message_type)
+{
+	size_t idx;
+
+	for (idx = 0; idx < table->current_size; ++idx) {
+		if (table->routes[idx].message_type == message_type) {
+			return 1;
+		}
+	}
+
+	return 0;
+}
+
 static int table_add_route(struct route_table **table_ptr,
 	struct stasis_message_type *message_type,
 	stasis_subscription_cb callback, void *data)
 {
 	struct route_table *table = *table_ptr;
 	struct stasis_message_route *route;
+
+	ast_assert(!table_has_route(table, message_type));
 
 	if (table->current_size + 1 > table->max_size) {
 		size_t new_max_size = table->max_size ? table->max_size * 2 : 1;

Modified: team/dlee/taskprocessor-optimization/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/main/taskprocessor.c?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/main/taskprocessor.c (original)
+++ team/dlee/taskprocessor-optimization/main/taskprocessor.c Mon Sep 23 23:31:25 2013
@@ -356,8 +356,8 @@
 	char name[256];
 	int tcount;
 	unsigned long qsize;
-	unsigned long maxqsize = 0;
-	unsigned long processed = 0;
+	unsigned long maxqsize;
+	unsigned long processed;
 	struct ast_taskprocessor *p;
 	struct ao2_iterator i;
 
@@ -378,8 +378,6 @@
 	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
 	i = ao2_iterator_init(tps_singletons, 0);
 	while ((p = ao2_iterator_next(&i))) {
-		ast_assert(p->stats != NULL);
-
 		ast_copy_string(name, p->name, sizeof(name));
 		qsize = p->tps_queue_size;
 		maxqsize = p->stats->max_qsize;
@@ -423,15 +421,12 @@
 	}
 	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
 	/* free it */
-	ast_free(t->stats);
-	t->stats = NULL;
-
+	if (t->stats) {
+		ast_free(t->stats);
+		t->stats = NULL;
+	}
 	ast_free((char *) t->name);
 	if (t->listener) {
-		/* This code should not be reached since the listener
-		 * should have been destroyed before the taskprocessor could
-		 * be destroyed
-		 */
 		ao2_ref(t->listener, -1);
 		t->listener = NULL;
 	}
@@ -444,6 +439,7 @@
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
 	struct tps_task *task;
+	SCOPED_AO2LOCK(lock, tps);
 
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
 		tps->tps_queue_size--;
@@ -529,7 +525,6 @@
 		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
 		return NULL;
 	}
-
 	if (!(p->name = ast_strdup(name))) {
 		ao2_ref(p, -1);
 		return NULL;
@@ -651,7 +646,6 @@
 		return -1;
 	}
 	ao2_lock(tps);
-	ao2_ref(tps, +1); /* Let's say the queued task has a reference */
 	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
 	previous_size = tps->tps_queue_size++;
 	/* The currently executing task counts as still in queue */
@@ -666,23 +660,16 @@
 	struct tps_task *t;
 	int size;
 
-	ast_assert(tps->stats != NULL);
-
 	ao2_lock(tps);
-	t = tps_taskprocessor_pop(tps);
-
-	/* Empty queue; return false */
-	if (!t) {
-		ao2_unlock(tps);
-
-		/* Since there was no task, no need to decrement the refcount */
-		return 0;
-	}
 	tps->executing = 1;
 	ao2_unlock(tps);
 
-	t->execute(t->datap);
-	tps_task_free(t);
+	t = tps_taskprocessor_pop(tps);
+
+	if (t) {
+		t->execute(t->datap);
+		tps_task_free(t);
+	}
 
 	ao2_lock(tps);
 	/* We need to check size in the same critical section where we reset the
@@ -692,17 +679,17 @@
 	tps->executing = 0;
 	size = tps_taskprocessor_depth(tps);
 	/* If we executed a task, bump the stats */
-	tps->stats->_tasks_processed_count++;
-	if (size > tps->stats->max_qsize) {
-		tps->stats->max_qsize = size;
+	if (t && tps->stats) {
+		tps->stats->_tasks_processed_count++;
+		if (size > tps->stats->max_qsize) {
+			tps->stats->max_qsize = size;
+		}
 	}
 	ao2_unlock(tps);
 
 	/* If we executed a task, check for the transition to empty */
-	if (size == 0 && tps->listener->callbacks->emptied) {
+	if (t && size == 0 && tps->listener->callbacks->emptied) {
 		tps->listener->callbacks->emptied(tps->listener);
 	}
-
-	ao2_ref(tps, -1); /* task no longer has a reference */
 	return size > 0;
 }

Modified: team/dlee/taskprocessor-optimization/tests/test_stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/tests/test_stasis.c?view=diff&rev=399651&r1=399650&r2=399651
==============================================================================
--- team/dlee/taskprocessor-optimization/tests/test_stasis.c (original)
+++ team/dlee/taskprocessor-optimization/tests/test_stasis.c Mon Sep 23 23:31:25 2013
@@ -863,52 +863,6 @@
 	ao2_cleanup(cache_dump);
 	cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
 	ast_test_validate(test, 0 == ao2_container_count(cache_dump));
-
-	return AST_TEST_PASS;
-}
-
-AST_TEST_DEFINE(route_conflicts)
-{
-	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
-	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
-	RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
-	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
-	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
-	int ret;
-
-	switch (cmd) {
-	case TEST_INIT:
-		info->name = __func__;
-		info->category = test_category;
-		info->summary =
-			"Multiple routes to the same message_type should fail";
-		info->description =
-			"Multiple routes to the same message_type should fail";
-		return AST_TEST_NOT_RUN;
-	case TEST_EXECUTE:
-		break;
-	}
-
-	topic = stasis_topic_create("TestTopic");
-	ast_test_validate(test, NULL != topic);
-
-	consumer1 = consumer_create(1);
-	ast_test_validate(test, NULL != consumer1);
-	consumer2 = consumer_create(1);
-	ast_test_validate(test, NULL != consumer2);
-
-	test_message_type = stasis_message_type_create("TestMessage", NULL);
-	ast_test_validate(test, NULL != test_message_type);
-
-	uut = stasis_message_router_create(topic);
-	ast_test_validate(test, NULL != uut);
-
-	ret = stasis_message_router_add(
-		uut, test_message_type, consumer_exec, consumer1);
-	ast_test_validate(test, 0 == ret);
-	ret = stasis_message_router_add(
-		uut, test_message_type, consumer_exec, consumer2);
-	ast_test_validate(test, 0 != ret);
 
 	return AST_TEST_PASS;
 }
@@ -1373,7 +1327,6 @@
 	AST_TEST_UNREGISTER(cache_filter);
 	AST_TEST_UNREGISTER(cache);
 	AST_TEST_UNREGISTER(cache_dump);
-	AST_TEST_UNREGISTER(route_conflicts);
 	AST_TEST_UNREGISTER(router);
 	AST_TEST_UNREGISTER(router_cache_updates);
 	AST_TEST_UNREGISTER(interleaving);
@@ -1397,7 +1350,6 @@
 	AST_TEST_REGISTER(cache_filter);
 	AST_TEST_REGISTER(cache);
 	AST_TEST_REGISTER(cache_dump);
-	AST_TEST_REGISTER(route_conflicts);
 	AST_TEST_REGISTER(router);
 	AST_TEST_REGISTER(router_cache_updates);
 	AST_TEST_REGISTER(interleaving);




More information about the asterisk-commits mailing list