[asterisk-commits] dlee: branch 12 r400178 - in /branches/12: ./ configs/ include/asterisk/ main/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Sep 30 13:26:31 CDT 2013


Author: dlee
Date: Mon Sep 30 13:26:27 2013
New Revision: 400178

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=400178
Log:
Taskprocessor optimization; switch Stasis to use taskprocessors

This patch optimizes taskprocessor to use a semaphore for signaling,
which the OS can do a better job at managing contention and waiting
that we can with a mutex and condition.

The taskprocessor execution was also slightly optimized to reduce the
number of locks taken.

The only observable difference in the taskprocessor implementation is
that when the final reference to the taskprocessor goes away, it will
execute all tasks to completion instead of discarding the unexecuted
tasks.

For systems where unnamed semaphores are not supported, a really
simple semaphore implementation is provided. (Which gives identical
performance as the original taskprocessor implementation).

The way we ended up implementing Stasis caused the threadpool to be a
burden instead of a boost to performance. This was switched to just
use taskprocessors directly for subscriptions.

Review: https://reviewboard.asterisk.org/r/2881/

Added:
    branches/12/include/asterisk/sem.h
      - copied unchanged from r400177, team/dlee/taskprocessor-optimization/include/asterisk/sem.h
    branches/12/main/sem.c
      - copied unchanged from r400177, team/dlee/taskprocessor-optimization/main/sem.c
Removed:
    branches/12/configs/stasis.conf.sample
    branches/12/main/stasis_config.c
Modified:
    branches/12/configure
    branches/12/configure.ac
    branches/12/include/asterisk/autoconfig.h.in
    branches/12/include/asterisk/stasis.h
    branches/12/include/asterisk/taskprocessor.h
    branches/12/main/stasis.c
    branches/12/main/taskprocessor.c

Modified: branches/12/configure.ac
URL: http://svnview.digium.com/svn/asterisk/branches/12/configure.ac?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/configure.ac (original)
+++ branches/12/configure.ac Mon Sep 30 13:26:27 2013
@@ -808,11 +808,12 @@
 AC_MSG_RESULT(no)
 )
 
-AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 save_LIBS="$LIBS"
 save_CFLAGS="$CFLAGS"
 LIBS="$PTHREAD_LIBS $LIBS"
 CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 AC_LINK_IFELSE(
   [AC_LANG_PROGRAM(
     [#include <pthread.h>
@@ -826,6 +827,17 @@
     ac_cv_pthread_rwlock_timedwrlock="no"
   ]
 )
+
+# Some platforms define sem_init(), but only support sem_open(). joyous.
+AC_MSG_CHECKING(for working unnamed semaphores)
+AC_RUN_IFELSE(
+	[AC_LANG_PROGRAM([#include <semaphore.h>],
+		[sem_t sem; return sem_init(&sem, 0, 0);])],
+	AC_MSG_RESULT(yes)
+	AC_DEFINE([HAS_WORKING_SEMAPHORE], 1, [Define to 1 if anonymous semaphores work.]),
+	AC_MSG_RESULT(no)
+)
+
 LIBS="$save_LIBS"
 CFLAGS="$save_CFLAGS"
 if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then

Modified: branches/12/include/asterisk/autoconfig.h.in
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/autoconfig.h.in?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/include/asterisk/autoconfig.h.in (original)
+++ branches/12/include/asterisk/autoconfig.h.in Mon Sep 30 13:26:27 2013
@@ -28,6 +28,9 @@
 
 /* Define to 1 if using `alloca.c'. */
 #undef C_ALLOCA
+
+/* Define to 1 if anonymous semaphores work. */
+#undef HAS_WORKING_SEMAPHORE
 
 /* Define to 1 if you have the `acos' function. */
 #undef HAVE_ACOS

Modified: branches/12/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/stasis.h?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/include/asterisk/stasis.h (original)
+++ branches/12/include/asterisk/stasis.h Mon Sep 30 13:26:27 2013
@@ -884,16 +884,6 @@
  */
 int stasis_wait_init(void);
 
-struct ast_threadpool_options;
-
-/*!
- * \internal
- * \brief Retrieves the Stasis threadpool configuration.
- * \param[out] threadpool_options Filled with Stasis threadpool options.
- */
-void stasis_config_get_threadpool_options(
-	struct ast_threadpool_options *threadpool_options);
-
 /*! @} */
 
 /*!

Modified: branches/12/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/branches/12/include/asterisk/taskprocessor.h?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/include/asterisk/taskprocessor.h (original)
+++ branches/12/include/asterisk/taskprocessor.h Mon Sep 30 13:26:27 2013
@@ -109,6 +109,7 @@
 	 * \param listener The listener
 	 */
 	void (*shutdown)(struct ast_taskprocessor_listener *listener);
+	void (*dtor)(struct ast_taskprocessor_listener *listener);
 };
 
 /*!

Modified: branches/12/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/stasis.c?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/main/stasis.c (original)
+++ branches/12/main/stasis.c Mon Sep 30 13:26:27 2013
@@ -34,7 +34,6 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
@@ -134,9 +133,6 @@
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
-/*! Threadpool for dispatching notifications to subscribers */
-static struct ast_threadpool *pool;
-
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -286,7 +282,15 @@
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 	if (needs_mailbox) {
-		sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+		/* With a small number of subscribers, a thread-per-sub is
+		 * acceptable. If our usage changes so that we have larger
+		 * numbers of subscribers, we'll probably want to consider
+		 * a threadpool. We had that originally, but with so few
+		 * subscribers it was actually a performance loss instead of
+		 * a gain.
+		 */
+		sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+			TPS_REF_DEFAULT);
 		if (!sub->mailbox) {
 			return NULL;
 		}
@@ -731,13 +735,6 @@
 	ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
 }
 
-/*! \brief Shutdown function */
-static void stasis_exit(void)
-{
-	ast_threadpool_shutdown(pool);
-	pool = NULL;
-}
-
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
@@ -748,36 +745,14 @@
 {
 	int cache_init;
 
-	struct ast_threadpool_options opts;
-
 	/* Be sure the types are cleaned up after the message bus */
 	ast_register_cleanup(stasis_cleanup);
-	ast_register_atexit(stasis_exit);
-
-	if (stasis_config_init() != 0) {
-		ast_log(LOG_ERROR, "Stasis configuration failed\n");
-		return -1;
-	}
 
 	if (stasis_wait_init() != 0) {
 		ast_log(LOG_ERROR, "Stasis initialization failed\n");
 		return -1;
 	}
 
-	if (pool) {
-		ast_log(LOG_ERROR, "Stasis double-initialized\n");
-		return -1;
-	}
-
-	stasis_config_get_threadpool_options(&opts);
-	ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
-		opts.initial_size, opts.max_size, opts.idle_timeout);
-	pool = ast_threadpool_create("stasis-core", NULL, &opts);
-	if (!pool) {
-		ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
-		return -1;
-	}
-
 	cache_init = stasis_cache_init();
 	if (cache_init != 0) {
 		return -1;

Modified: branches/12/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/branches/12/main/taskprocessor.c?view=diff&rev=400178&r1=400177&r2=400178
==============================================================================
--- branches/12/main/taskprocessor.c (original)
+++ branches/12/main/taskprocessor.c Mon Sep 30 13:26:27 2013
@@ -37,6 +37,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
 
 /*!
  * \brief tps_task structure is queued to a taskprocessor
@@ -113,9 +114,6 @@
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
 static void tps_taskprocessor_destroy(void *tps);
 
@@ -138,47 +136,56 @@
 
 struct default_taskprocessor_listener_pvt {
 	pthread_t poll_thread;
-	ast_mutex_t lock;
-	ast_cond_t cond;
-	int wake_up;
 	int dead;
+	struct ast_sem sem;
 };
 
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	pvt->wake_up = 1;
-	pvt->dead = should_die;
-	ast_cond_signal(&pvt->cond);
-}
-
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	while (!pvt->wake_up) {
-		ast_cond_wait(&pvt->cond, lock);
-	}
-	pvt->wake_up = 0;
-	return pvt->dead;
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+{
+	ast_assert(pvt->dead);
+	ast_sem_destroy(&pvt->sem);
+	ast_free(pvt);
+}
+
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
+{
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+	default_listener_pvt_destroy(pvt);
+
+	listener->user_data = NULL;
 }
 
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
  */
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
 {
 	struct ast_taskprocessor_listener *listener = data;
 	struct ast_taskprocessor *tps = listener->tps;
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	int dead = 0;
-
-	while (!dead) {
-		if (!ast_taskprocessor_execute(tps)) {
-			dead = default_tps_idle(pvt);
+	int sem_value;
+	int res;
+
+	while (!pvt->dead) {
+		res = ast_sem_wait(&pvt->sem);
+		if (res != 0 && errno != EINTR) {
+			ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+				strerror(errno));
+			/* Just give up */
+			break;
 		}
-	}
+		ast_taskprocessor_execute(tps);
+	}
+
+	/* No posting to a dead taskprocessor! */
+	res = ast_sem_getvalue(&pvt->sem, &sem_value);
+	ast_assert(res == 0 && sem_value == 0);
+
+	/* Free the shutdown reference (see default_listener_shutdown) */
+	ao2_t_ref(listener->tps, -1, "tps-shutdown");
+
 	return NULL;
 }
 
@@ -186,7 +193,7 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+	if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
 		return -1;
 	}
 
@@ -197,33 +204,50 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	ast_assert(!pvt->dead);
-
-	if (was_empty) {
-		default_tps_wake_up(pvt, 0);
-	}
-}
-
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
-{
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
-	ast_free(pvt);
+	if (ast_sem_post(&pvt->sem) != 0) {
+		ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+			strerror(errno));
+	}
+}
+
+static int default_listener_die(void *data)
+{
+	struct default_taskprocessor_listener_pvt *pvt = data;
+	pvt->dead = 1;
+	return 0;
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	default_tps_wake_up(pvt, 1);
-	pthread_join(pvt->poll_thread, NULL);
+	int res;
+
+	/* Hold a reference during shutdown */
+	ao2_t_ref(listener->tps, +1, "tps-shutdown");
+
+	ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+	if (pthread_self() == pvt->poll_thread) {
+		res = pthread_detach(pvt->poll_thread);
+		if (res != 0) {
+			ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+				strerror(errno));
+		}
+	} else {
+		res = pthread_join(pvt->poll_thread, NULL);
+		if (res != 0) {
+			ast_log(LOG_ERROR, "pthread_join(): %s\n",
+				strerror(errno));
+		}
+	}
 	pvt->poll_thread = AST_PTHREADT_NULL;
-	default_listener_pvt_destroy(pvt);
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
 	.start = default_listener_start,
 	.task_pushed = default_task_pushed,
 	.shutdown = default_listener_shutdown,
+	.dtor = default_listener_pvt_dtor,
 };
 
 /*!
@@ -268,9 +292,7 @@
 /* release task resources */
 static void *tps_task_free(struct tps_task *task)
 {
-	if (task) {
-		ast_free(task);
-	}
+	ast_free(task);
 	return NULL;
 }
 
@@ -425,10 +447,8 @@
 	}
 	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
 	/* free it */
-	if (t->stats) {
-		ast_free(t->stats);
-		t->stats = NULL;
-	}
+	ast_free(t->stats);
+	t->stats = NULL;
 	ast_free((char *) t->name);
 	if (t->listener) {
 		ao2_ref(t->listener, -1);
@@ -443,7 +463,6 @@
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
 	struct tps_task *task;
-	SCOPED_AO2LOCK(lock, tps);
 
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
 		tps->tps_queue_size--;
@@ -472,10 +491,21 @@
 	ao2_ref(listener->tps, -1);
 }
 
+static void taskprocessor_listener_dtor(void *obj)
+{
+	struct ast_taskprocessor_listener *listener = obj;
+
+	if (listener->callbacks->dtor) {
+		listener->callbacks->dtor(listener);
+	}
+}
+
 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 {
 	RAII_VAR(struct ast_taskprocessor_listener *, listener,
-			ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+			NULL, ao2_cleanup);
+
+	listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
 
 	if (!listener) {
 		return NULL;
@@ -506,9 +536,12 @@
 	if (!pvt) {
 		return NULL;
 	}
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
 	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+		ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+		ast_free(pvt);
+		return NULL;
+	}
 	return pvt;
 }
 
@@ -590,7 +623,6 @@
 
 	p = __allocate_taskprocessor(name, listener);
 	if (!p) {
-		default_listener_pvt_destroy(pvt);
 		ao2_ref(listener, -1);
 		return NULL;
 	}
@@ -662,15 +694,17 @@
 	int size;
 
 	ao2_lock(tps);
+	t = tps_taskprocessor_pop(tps);
+	if (!t) {
+		ao2_unlock(tps);
+		return 0;
+	}
+
 	tps->executing = 1;
 	ao2_unlock(tps);
 
-	t = tps_taskprocessor_pop(tps);
-
-	if (t) {
-		t->execute(t->datap);
-		tps_task_free(t);
-	}
+	t->execute(t->datap);
+	tps_task_free(t);
 
 	ao2_lock(tps);
 	/* We need to check size in the same critical section where we reset the
@@ -680,7 +714,7 @@
 	tps->executing = 0;
 	size = tps_taskprocessor_depth(tps);
 	/* If we executed a task, bump the stats */
-	if (t && tps->stats) {
+	if (tps->stats) {
 		tps->stats->_tasks_processed_count++;
 		if (size > tps->stats->max_qsize) {
 			tps->stats->max_qsize = size;
@@ -689,7 +723,7 @@
 	ao2_unlock(tps);
 
 	/* If we executed a task, check for the transition to empty */
-	if (t && size == 0 && tps->listener->callbacks->emptied) {
+	if (size == 0 && tps->listener->callbacks->emptied) {
 		tps->listener->callbacks->emptied(tps->listener);
 	}
 	return size > 0;




More information about the asterisk-commits mailing list