[asterisk-commits] dlee: branch dlee/taskprocessor-optimization r399543 - in /team/dlee/taskproc...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Sep 20 12:27:31 CDT 2013


Author: dlee
Date: Fri Sep 20 12:27:27 2013
New Revision: 399543

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399543
Log:
I can't believe this 1) works and 2) is faster.

Added:
    team/dlee/taskprocessor-optimization/include/asterisk/sem.h   (with props)
    team/dlee/taskprocessor-optimization/main/sem.c   (with props)
Modified:
    team/dlee/taskprocessor-optimization/configure
    team/dlee/taskprocessor-optimization/configure.ac
    team/dlee/taskprocessor-optimization/include/asterisk/autoconfig.h.in
    team/dlee/taskprocessor-optimization/main/taskprocessor.c

Modified: team/dlee/taskprocessor-optimization/configure.ac
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/configure.ac?view=diff&rev=399543&r1=399542&r2=399543
==============================================================================
--- team/dlee/taskprocessor-optimization/configure.ac (original)
+++ team/dlee/taskprocessor-optimization/configure.ac Fri Sep 20 12:27:27 2013
@@ -808,11 +808,12 @@
 AC_MSG_RESULT(no)
 )
 
-AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 save_LIBS="$LIBS"
 save_CFLAGS="$CFLAGS"
 LIBS="$PTHREAD_LIBS $LIBS"
 CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
 AC_LINK_IFELSE(
   [AC_LANG_PROGRAM(
     [#include <pthread.h>
@@ -826,6 +827,17 @@
     ac_cv_pthread_rwlock_timedwrlock="no"
   ]
 )
+
+# Some platforms define sem_init(), but only support sem_open(). joyous.
+AC_MSG_CHECKING(for working unnamed semaphores)
+AC_RUN_IFELSE(
+	[AC_LANG_PROGRAM([#include <semaphore.h>],
+		[sem_t sem; return sem_init(&sem, 0, 0);])],
+	AC_MSG_RESULT(yes)
+	AC_DEFINE([HAS_WORKING_SEMAPHORE], 1, [Define to 1 if anonymous semaphores work.]),
+	AC_MSG_RESULT(no)
+)
+
 LIBS="$save_LIBS"
 CFLAGS="$save_CFLAGS"
 if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then

Modified: team/dlee/taskprocessor-optimization/include/asterisk/autoconfig.h.in
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/include/asterisk/autoconfig.h.in?view=diff&rev=399543&r1=399542&r2=399543
==============================================================================
--- team/dlee/taskprocessor-optimization/include/asterisk/autoconfig.h.in (original)
+++ team/dlee/taskprocessor-optimization/include/asterisk/autoconfig.h.in Fri Sep 20 12:27:27 2013
@@ -28,6 +28,9 @@
 
 /* Define to 1 if using `alloca.c'. */
 #undef C_ALLOCA
+
+/* Define to 1 if anonymous semaphores work. */
+#undef HAS_WORKING_SEMAPHORE
 
 /* Define to 1 if you have the `acos' function. */
 #undef HAVE_ACOS

Added: team/dlee/taskprocessor-optimization/include/asterisk/sem.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/include/asterisk/sem.h?view=auto&rev=399543
==============================================================================
--- team/dlee/taskprocessor-optimization/include/asterisk/sem.h (added)
+++ team/dlee/taskprocessor-optimization/include/asterisk/sem.h Fri Sep 20 12:27:27 2013
@@ -1,0 +1,54 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef ASTERISK_SEMAPHORE_H
+#define ASTERISK_SEMAPHORE_H
+
+#ifdef HAS_WORKING_SEMAPHORE
+
+#include <semaphore.h>
+
+struct ast_sem {
+	sem_t real_sem;
+};
+
+#else
+
+#include "asterisk/lock.h"
+
+struct ast_sem {
+	int count;
+	int waiters;
+	ast_mutex_t mutex;
+	ast_cond_t cond;
+};
+
+#endif
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value);
+
+int ast_sem_destroy(struct ast_sem *sem);
+
+int ast_sem_post(struct ast_sem *sem);
+
+int ast_sem_wait(struct ast_sem *sem);
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval);
+
+
+#endif /* ASTERISK_SEMAPHORE_H */

Propchange: team/dlee/taskprocessor-optimization/include/asterisk/sem.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/taskprocessor-optimization/include/asterisk/sem.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/taskprocessor-optimization/include/asterisk/sem.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dlee/taskprocessor-optimization/main/sem.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/main/sem.c?view=auto&rev=399543
==============================================================================
--- team/dlee/taskprocessor-optimization/main/sem.c (added)
+++ team/dlee/taskprocessor-optimization/main/sem.c Fri Sep 20 12:27:27 2013
@@ -1,0 +1,117 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * David M. Lee, II <dlee at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Asterisk semaphore support.
+ */
+
+#include "asterisk.h"
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#include "asterisk/sem.h"
+#include "asterisk/utils.h"
+
+#ifdef HAS_WORKING_SEMAPHORE
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+	return sem_init(&sem->real_sem, pshared, value);
+}
+
+int ast_sem_destroy(struct ast_sem *sem)
+{
+	return sem_destroy(&sem->real_sem);
+}
+
+int ast_sem_post(struct ast_sem *sem)
+{
+	return sem_post(&sem->real_sem);
+}
+
+int ast_sem_wait(struct ast_sem *sem)
+{
+	return sem_wait(&sem->real_sem);
+}
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+	return sem_getvalue(&sem->real_sem, sval);
+}
+
+#else
+
+int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
+{
+	sem->count = 0;
+	sem->waiters = 0;
+	ast_mutex_init(&sem->mutex);
+	ast_cond_init(&sem->cond, NULL);
+	return 0;
+}
+
+int ast_sem_destroy(struct ast_sem *sem)
+{
+	ast_mutex_destroy(&sem->mutex);
+	ast_cond_destroy(&sem->cond);
+	return 0;
+}
+
+int ast_sem_post(struct ast_sem *sem)
+{
+	SCOPED_MUTEX(lock, &sem->mutex);
+
+	ast_assert(sem->count >= 0);
+
+	++sem->count;
+
+	if (sem->waiters) {
+		ast_cond_signal(&sem->cond);
+	}
+
+	return 0;
+}
+
+int ast_sem_wait(struct ast_sem *sem)
+{
+	SCOPED_MUTEX(lock, &sem->mutex);
+
+	ast_assert(sem->count >= 0);
+
+	while (sem->count == 0) {
+		++sem->waiters;
+		ast_cond_wait(&sem->cond, &sem->mutex);
+		--sem->waiters;
+	}
+
+	--sem->count;
+
+	return 0;
+}
+
+int ast_sem_getvalue(struct ast_sem *sem, int *sval)
+{
+	SCOPED_MUTEX(lock, &sem->mutex);
+
+	ast_assert(sem->count >= 0);
+	*sval = sem->count;
+	return 0;
+}
+
+#endif

Propchange: team/dlee/taskprocessor-optimization/main/sem.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dlee/taskprocessor-optimization/main/sem.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dlee/taskprocessor-optimization/main/sem.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: team/dlee/taskprocessor-optimization/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/taskprocessor-optimization/main/taskprocessor.c?view=diff&rev=399543&r1=399542&r2=399543
==============================================================================
--- team/dlee/taskprocessor-optimization/main/taskprocessor.c (original)
+++ team/dlee/taskprocessor-optimization/main/taskprocessor.c Fri Sep 20 12:27:27 2013
@@ -37,6 +37,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
 
 /*!
  * \brief tps_task structure is queued to a taskprocessor
@@ -113,9 +114,6 @@
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
 static void tps_taskprocessor_destroy(void *tps);
 
@@ -138,47 +136,44 @@
 
 struct default_taskprocessor_listener_pvt {
 	pthread_t poll_thread;
-	ast_mutex_t lock;
-	ast_cond_t cond;
-	int wake_up;
 	int dead;
+	struct ast_sem sem;
 };
 
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	pvt->wake_up = 1;
-	pvt->dead = should_die;
-	ast_cond_signal(&pvt->cond);
-}
-
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	while (!pvt->wake_up) {
-		ast_cond_wait(&pvt->cond, lock);
-	}
-	pvt->wake_up = 0;
-	return pvt->dead;
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+{
+	ast_assert(pvt->dead);
+	ast_sem_destroy(&pvt->sem);
+	ast_free(pvt);
 }
 
 /*!
  * \brief Function that processes tasks in the taskprocessor
  * \internal
  */
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
 {
 	struct ast_taskprocessor_listener *listener = data;
 	struct ast_taskprocessor *tps = listener->tps;
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	int dead = 0;
-
-	while (!dead) {
-		if (!ast_taskprocessor_execute(tps)) {
-			dead = default_tps_idle(pvt);
+	int sem_value;
+	int res;
+
+	while (!pvt->dead) {
+		res = ast_sem_wait(&pvt->sem);
+		if (res != 0 && errno != EINTR) { 
+			ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+				strerror(errno));
+			/* Just give up */
+			break;
 		}
-	}
+		ast_taskprocessor_execute(tps);
+	}
+
+	/* No posting to a dead taskprocessor! */
+	res = ast_sem_getvalue(&pvt->sem, &sem_value);
+	ast_assert(res == 0 && sem_value == 0);
+
 	return NULL;
 }
 
@@ -186,7 +181,7 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+	if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
 		return -1;
 	}
 
@@ -197,24 +192,25 @@
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
 
-	ast_assert(!pvt->dead);
-
-	if (was_empty) {
-		default_tps_wake_up(pvt, 0);
-	}
-}
-
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
-{
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
-	ast_free(pvt);
+	if (ast_sem_post(&pvt->sem) != 0) {
+		ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+			strerror(errno));
+	}
+}
+
+static int default_listener_die(void *data)
+{
+	struct default_taskprocessor_listener_pvt *pvt = data;
+	pvt->dead = 1;
+	return 0;
 }
 
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
-	default_tps_wake_up(pvt, 1);
+
+	ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
 	pthread_join(pvt->poll_thread, NULL);
 	pvt->poll_thread = AST_PTHREADT_NULL;
 	default_listener_pvt_destroy(pvt);
@@ -510,9 +506,12 @@
 	if (!pvt) {
 		return NULL;
 	}
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
 	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+		ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+		ast_free(pvt);
+		return NULL;
+	}
 	return pvt;
 }
 




More information about the asterisk-commits mailing list