[asterisk-commits] dlee: branch group/performance r399661 - in /team/group/performance: ./ confi...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Sep 24 10:58:12 CDT 2013


Author: dlee
Date: Tue Sep 24 10:58:10 2013
New Revision: 399661

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399661
Log:
Merged ^/team/dlee/taskprocessor-optimization at 399654

Added:
    team/group/performance/include/asterisk/sem.h
      - copied unchanged from r399660, team/dlee/taskprocessor-optimization/include/asterisk/sem.h
    team/group/performance/main/sem.c
      - copied unchanged from r399660, team/dlee/taskprocessor-optimization/main/sem.c
Removed:
    team/group/performance/configs/stasis.conf.sample
    team/group/performance/main/stasis_config.c
Modified:
    team/group/performance/   (props changed)
    team/group/performance/configure
    team/group/performance/configure.ac
    team/group/performance/include/asterisk/autoconfig.h.in
    team/group/performance/include/asterisk/stasis.h
    team/group/performance/include/asterisk/taskprocessor.h
    team/group/performance/main/stasis.c
    team/group/performance/main/taskprocessor.c

Propchange: team/group/performance/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Sep 24 10:58:10 2013
@@ -1,1 +1,1 @@
-/team/dlee/performance:1-399659
+/team/dlee/performance:1-399659 /team/dlee/taskprocessor-optimization:1-399654

Modified: team/group/performance/configure.ac
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/configure.ac?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/configure.ac (original)
+++ team/group/performance/configure.ac Tue Sep 24 10:58:10 2013
@@ -808,11 +808,12 @@
 AC_MSG_RESULT(no)
 )
 
-AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 save_LIBS="$LIBS"
 save_CFLAGS="$CFLAGS"
 LIBS="$PTHREAD_LIBS $LIBS"
 CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 AC_LINK_IFELSE(
   [AC_LANG_PROGRAM(
     [#include <pthread.h>
@@ -826,6 +827,17 @@
     ac_cv_pthread_rwlock_timedwrlock="no"
   ]
 )
+
+# Some platforms define sem_init(), but only support sem_open(). joyous.
+AC_MSG_CHECKING(for working unnamed semaphores)
+AC_RUN_IFELSE(
+	[AC_LANG_PROGRAM([#include <semaphore.h>],
+		[sem_t sem; return sem_init(&sem, 0, 0);])],
+	AC_MSG_RESULT(yes)
+	AC_DEFINE([HAS_WORKING_SEMAPHORE], 1, [Define to 1 if anonymous semaphores work.]),
+	AC_MSG_RESULT(no)
+)
+
 LIBS="$save_LIBS"
 CFLAGS="$save_CFLAGS"
 if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then

Modified: team/group/performance/include/asterisk/autoconfig.h.in
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/include/asterisk/autoconfig.h.in?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/include/asterisk/autoconfig.h.in (original)
+++ team/group/performance/include/asterisk/autoconfig.h.in Tue Sep 24 10:58:10 2013
@@ -28,6 +28,9 @@
 
 /* Define to 1 if using `alloca.c'. */
 #undef C_ALLOCA
+
+/* Define to 1 if anonymous semaphores work. */
+#undef HAS_WORKING_SEMAPHORE
 
 /* Define to 1 if you have the `acos' function. */
 #undef HAVE_ACOS

Modified: team/group/performance/include/asterisk/stasis.h
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/include/asterisk/stasis.h?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/include/asterisk/stasis.h (original)
+++ team/group/performance/include/asterisk/stasis.h Tue Sep 24 10:58:10 2013
@@ -884,16 +884,6 @@
  */
 int stasis_wait_init(void);
 
-struct ast_threadpool_options;
-
-/*!
- * \internal
- * \brief Retrieves the Stasis threadpool configuration.
- * \param[out] threadpool_options Filled with Stasis threadpool options.
- */
-void stasis_config_get_threadpool_options(
-	struct ast_threadpool_options *threadpool_options);
-
 /*! @} */
 
 /*!

Modified: team/group/performance/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/include/asterisk/taskprocessor.h?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/include/asterisk/taskprocessor.h (original)
+++ team/group/performance/include/asterisk/taskprocessor.h Tue Sep 24 10:58:10 2013
@@ -109,6 +109,7 @@
 	 * \param listener The listener
 	 */
 	void (*shutdown)(struct ast_taskprocessor_listener *listener);
+	void (*dtor)(struct ast_taskprocessor_listener *listener);
 };
 
 /*!

Modified: team/group/performance/main/stasis.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/stasis.c?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/main/stasis.c (original)
+++ team/group/performance/main/stasis.c Tue Sep 24 10:58:10 2013
@@ -34,7 +34,6 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_internal.h"
 #include "asterisk/stasis.h"
-#include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
 #include "asterisk/utils.h"
 #include "asterisk/uuid.h"
@@ -134,9 +133,6 @@
 /*! The number of buckets to use for topic pools */
 #define TOPIC_POOL_BUCKETS 57
 
-/*! Threadpool for dispatching notifications to subscribers */
-static struct ast_threadpool *pool;
-
 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 
 /*! \internal */
@@ -286,7 +282,15 @@
 	ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
 
 	if (needs_mailbox) {
-		sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+		/* With a small number of subscribers, a thread-per-sub is
+		 * acceptable. If our usage changes so that we have larger
+		 * numbers of subscribers, we'll probably want to consider
+		 * a threadpool. We had that originally, but with so few
+		 * subscribers it was actually a performance loss instead of
+		 * a gain.
+		 */
+		sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+			TPS_REF_DEFAULT);
 		if (!sub->mailbox) {
 			return NULL;
 		}
@@ -731,13 +735,6 @@
 	ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
 }
 
-/*! \brief Shutdown function */
-static void stasis_exit(void)
-{
-	ast_threadpool_shutdown(pool);
-	pool = NULL;
-}
-
 /*! \brief Cleanup function for graceful shutdowns */
 static void stasis_cleanup(void)
 {
@@ -748,36 +745,14 @@
 {
 	int cache_init;
 
-	struct ast_threadpool_options opts;
-
 	/* Be sure the types are cleaned up after the message bus */
 	ast_register_cleanup(stasis_cleanup);
-	ast_register_atexit(stasis_exit);
-
-	if (stasis_config_init() != 0) {
-		ast_log(LOG_ERROR, "Stasis configuration failed\n");
-		return -1;
-	}
 
 	if (stasis_wait_init() != 0) {
 		ast_log(LOG_ERROR, "Stasis initialization failed\n");
 		return -1;
 	}
 
-	if (pool) {
-		ast_log(LOG_ERROR, "Stasis double-initialized\n");
-		return -1;
-	}
-
-	stasis_config_get_threadpool_options(&opts);
-	ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
-		opts.initial_size, opts.max_size, opts.idle_timeout);
-	pool = ast_threadpool_create("stasis-core", NULL, &opts);
-	if (!pool) {
-		ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
-		return -1;
-	}
-
 	cache_init = stasis_cache_init();
 	if (cache_init != 0) {
 		return -1;

Modified: team/group/performance/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/group/performance/main/taskprocessor.c?view=diff&rev=399661&r1=399660&r2=399661
==============================================================================
--- team/group/performance/main/taskprocessor.c (original)
+++ team/group/performance/main/taskprocessor.c Tue Sep 24 10:58:10 2013
@@ -37,6 +37,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
 
 /*!
  * \brief tps_task structure is queued to a taskprocessor
@@ -113,9 +114,6 @@
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
 static void tps_taskprocessor_destroy(void *tps);
 
@@ -138,47 +136,56 @@
 
 struct default_taskprocessor_listener_pvt {
 	pthread_t poll_thread;
-	ast_mutex_t lock;
-	ast_cond_t cond;
-	int wake_up;
 	int dead;
+	struct ast_sem sem;
 };
 
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	pvt->wake_up = 1;
-	pvt->dead = should_die;
-	ast_cond_signal(&pvt->cond);
-}
-
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	while (!pvt->wake_up) {
-		ast_cond_wait(&pvt->cond, lock);
-	}
-	pvt->wake_up = 0;
-	return pvt->dead;
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+{
+	ast_assert(pvt->dead);
+	ast_sem_destroy(&pvt->sem);
+	ast_free(pvt);
+}
+
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
+{
+	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+	default_listener_pvt_destroy(pvt);
+
+	listener->user_data = NULL;
 }
 
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
  */
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
 {
 	struct ast_taskprocessor_listener *listener = data;
 	struct ast_taskprocessor *tps = listener->tps;
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	int dead = 0;
-
-	while (!dead) {
-		if (!ast_taskprocessor_execute(tps)) {
-			dead = default_tps_idle(pvt);
+	int sem_value;
+	int res;
+
+	while (!pvt->dead) {
+		res = ast_sem_wait(&pvt->sem);
+		if (res != 0 && errno != EINTR) {
+			ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+				strerror(errno));
+			/* Just give up */
+			break;
 		}
-	}
+		ast_taskprocessor_execute(tps);
+	}
+
+	/* No posting to a dead taskprocessor! */
+	res = ast_sem_getvalue(&pvt->sem, &sem_value);
+	ast_assert(res == 0 && sem_value == 0);
+
+	/* Free the shutdown reference */
+	ao2_ref(listener->tps, -1);
+
 	return NULL;
 }
 
@@ -186,7 +193,7 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+	if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
 		return -1;
 	}
 
@@ -197,33 +204,50 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	ast_assert(!pvt->dead);
-
-	if (was_empty) {
-		default_tps_wake_up(pvt, 0);
-	}
-}
-
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
-{
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
-	ast_free(pvt);
+	if (ast_sem_post(&pvt->sem) != 0) {
+		ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+			strerror(errno));
+	}
+}
+
+static int default_listener_die(void *data)
+{
+	struct default_taskprocessor_listener_pvt *pvt = data;
+	pvt->dead = 1;
+	return 0;
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	default_tps_wake_up(pvt, 1);
-	pthread_join(pvt->poll_thread, NULL);
+	int res;
+
+	/* Hold a reference during shutdown */
+	ao2_ref(listener->tps, +1);
+
+	ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+	if (pthread_self() == pvt->poll_thread) {
+		res = pthread_detach(pvt->poll_thread);
+		if (res != 0) {
+			ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+				strerror(errno));
+		}
+	} else {
+		res = pthread_join(pvt->poll_thread, NULL);
+		if (res != 0) {
+			ast_log(LOG_ERROR, "pthread_join(): %s\n",
+				strerror(errno));
+		}
+	}
 	pvt->poll_thread = AST_PTHREADT_NULL;
-	default_listener_pvt_destroy(pvt);
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
 	.start = default_listener_start,
 	.task_pushed = default_task_pushed,
 	.shutdown = default_listener_shutdown,
+	.dtor = default_listener_pvt_dtor,
 };
 
 /*!
@@ -268,9 +292,7 @@
 /* release task resources */
 static void *tps_task_free(struct tps_task *task)
 {
-	if (task) {
-		ast_free(task);
-	}
+	ast_free(task);
 	return NULL;
 }
 
@@ -425,10 +447,8 @@
 	}
 	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
 	/* free it */
-	if (t->stats) {
-		ast_free(t->stats);
-		t->stats = NULL;
-	}
+	ast_free(t->stats);
+	t->stats = NULL;
 	ast_free((char *) t->name);
 	if (t->listener) {
 		ao2_ref(t->listener, -1);
@@ -443,7 +463,6 @@
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
 	struct tps_task *task;
-	SCOPED_AO2LOCK(lock, tps);
 
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
 		tps->tps_queue_size--;
@@ -472,10 +491,21 @@
 	ao2_ref(listener->tps, -1);
 }
 
+static void taskprocessor_listener_dtor(void *obj)
+{
+	struct ast_taskprocessor_listener *listener = obj;
+
+	if (listener->callbacks->dtor) {
+		listener->callbacks->dtor(listener);
+	}
+}
+
 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
 {
 	RAII_VAR(struct ast_taskprocessor_listener *, listener,
-			ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+			NULL, ao2_cleanup);
+
+	listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
 
 	if (!listener) {
 		return NULL;
@@ -506,9 +536,12 @@
 	if (!pvt) {
 		return NULL;
 	}
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
 	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+		ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+		ast_free(pvt);
+		return NULL;
+	}
 	return pvt;
 }
 
@@ -590,7 +623,6 @@
 
 	p = __allocate_taskprocessor(name, listener);
 	if (!p) {
-		default_listener_pvt_destroy(pvt);
 		ao2_ref(listener, -1);
 		return NULL;
 	}
@@ -662,15 +694,17 @@
 	int size;
 
 	ao2_lock(tps);
+	t = tps_taskprocessor_pop(tps);
+	if (!t) {
+		ao2_unlock(tps);
+		return 0;
+	}
+
 	tps->executing = 1;
 	ao2_unlock(tps);
 
-	t = tps_taskprocessor_pop(tps);
-
-	if (t) {
-		t->execute(t->datap);
-		tps_task_free(t);
-	}
+	t->execute(t->datap);
+	tps_task_free(t);
 
 	ao2_lock(tps);
 	/* We need to check size in the same critical section where we reset the
@@ -680,7 +714,7 @@
 	tps->executing = 0;
 	size = tps_taskprocessor_depth(tps);
 	/* If we executed a task, bump the stats */
-	if (t && tps->stats) {
+	if (tps->stats) {
 		tps->stats->_tasks_processed_count++;
 		if (size > tps->stats->max_qsize) {
 			tps->stats->max_qsize = size;
@@ -689,7 +723,7 @@
 	ao2_unlock(tps);
 
 	/* If we executed a task, check for the transition to empty */
-	if (t && size == 0 && tps->listener->callbacks->emptied) {
+	if (size == 0 && tps->listener->callbacks->emptied) {
 		tps->listener->callbacks->emptied(tps->listener);
 	}
 	return size > 0;




More information about the asterisk-commits mailing list