[asterisk-commits] russell: branch group/timing r122591 - /team/group/timing/res/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Jun 13 11:46:20 CDT 2008


Author: russell
Date: Fri Jun 13 11:46:19 2008
New Revision: 122591

URL: http://svn.digium.com/view/asterisk?view=rev&rev=122591
Log:
Check in progress.  The code is "complete", but not yet tested.

Thanks again to Josh for the timing logic I stole from bridge_softmix

Modified:
    team/group/timing/res/res_timing_pthread.c

Modified: team/group/timing/res/res_timing_pthread.c
URL: http://svn.digium.com/view/asterisk/team/group/timing/res/res_timing_pthread.c?view=diff&rev=122591&r1=122590&r2=122591
==============================================================================
--- team/group/timing/res/res_timing_pthread.c (original)
+++ team/group/timing/res/res_timing_pthread.c Fri Jun 13 11:46:19 2008
@@ -30,11 +30,14 @@
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 #include <math.h>
+#include <sys/select.h>
 
 #include "asterisk/module.h"
 #include "asterisk/timing.h"
 #include "asterisk/utils.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/time.h"
+#include "asterisk/lock.h"
 
 static void *timing_funcs_handle;
 
@@ -56,6 +59,10 @@
 	.timer_get_event = pthread_timer_get_event,
 };
 
+/* 1 tick / 20 ms */
+#define TIMING_INTERVAL 20
+#define MAX_RATE 50
+
 static struct ao2_container *pthread_timers;
 #define PTHREAD_TIMER_BUCKETS 563
 
@@ -74,10 +81,15 @@
 	int pipe[2];
 	enum pthread_timer_state state;
 	unsigned int rate;
+	/*! Interval in ms for current rate */
+	unsigned int interval;
+	struct timeval last_tick;
 };
 
 static void pthread_timer_destructor(void *obj);
 static struct pthread_timer *find_timer(int handle, int unlink);
+static void write_byte(int wr_fd);
+static void read_pipe(int rd_fd, unsigned int num, int clear);
 
 static int pthread_timer_open(void)
 {
@@ -121,9 +133,17 @@
 		return -1;
 	}
 
+	if (rate > 0 && rate < MAX_RATE) {
+		ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a max rate of %d / sec\n",
+			MAX_RATE);
+		errno = EINVAL;
+		return -1;
+	}
+
 	ao2_lock(timer);
+	timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
 	timer->rate = rate;
-	timer->state = rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
+	timer->interval = rate ? roundf(1000.0 / ((float) rate)) : 0;
 	ao2_unlock(timer);
 
 	ao2_ref(timer, -1);
@@ -134,7 +154,6 @@
 static void pthread_timer_ack(int handle, unsigned int quantity)
 {
 	struct pthread_timer *timer;
-	ssize_t res;
 
 	ast_assert(quantity > 0);
 
@@ -142,10 +161,176 @@
 		return;
 	}
 
+	if (timer->state == TIMER_STATE_CONTINUOUS) {
+		/* Leave the pipe alone, please! */
+		return;
+	}
+
+	read_pipe(timer->pipe[PIPE_READ], quantity, 0);
+
+	ao2_ref(timer, -1);
+}
+
+static int pthread_timer_enable_continuous(int handle)
+{
+	struct pthread_timer *timer;
+
+	if (!(timer = find_timer(handle, 0))) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	ao2_lock(timer);
+	timer->state = TIMER_STATE_CONTINUOUS;
+	write_byte(timer->pipe[PIPE_WRITE]);
+	ao2_unlock(timer);
+
+	ao2_ref(timer, -1);
+
+	return 0;
+}
+
+static int pthread_timer_disable_continuous(int handle)
+{
+	struct pthread_timer *timer;
+
+	if (!(timer = find_timer(handle, 0))) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	ao2_lock(timer);
+	timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
+	read_pipe(timer->pipe[PIPE_READ], 0, 1);
+	ao2_unlock(timer);
+
+	ao2_ref(timer, -1);
+
+	return 0;
+}
+
+static enum ast_timing_event pthread_timer_get_event(int handle)
+{
+	struct pthread_timer *timer;
+	enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
+
+	if (!(timer = find_timer(handle, 0))) {
+		return res;
+	}
+
+	if (timer->state == TIMER_STATE_CONTINUOUS) {
+		res = AST_TIMING_EVENT_CONTINUOUS;
+	}
+
+	ao2_ref(timer, -1);
+
+	return res;
+}
+
+static struct pthread_timer *find_timer(int handle, int unlink)
+{
+	struct pthread_timer *timer;
+	struct pthread_timer tmp_timer;
+	int flags = OBJ_POINTER;
+
+	tmp_timer.pipe[PIPE_READ] = handle;
+
+	if (unlink) {
+		flags |= OBJ_UNLINK;
+	}
+
+	if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
+		ast_assert(timer != NULL);
+		return NULL;
+	}
+
+	return timer;
+}
+
+static void pthread_timer_destructor(void *obj)
+{
+	struct pthread_timer *timer = obj;
+
+	if (timer->pipe[PIPE_READ] > -1) {
+		close(timer->pipe[PIPE_READ]);
+		timer->pipe[PIPE_READ] = -1;
+	}
+
+	if (timer->pipe[PIPE_WRITE] > -1) {
+		close(timer->pipe[PIPE_WRITE]);
+		timer->pipe[PIPE_WRITE] = -1;
+	}
+}
+
+/*!
+ * \note only PIPE_READ is guaranteed valid 
+ */
+static int pthread_timer_hash(const void *obj, const int flags)
+{
+	const struct pthread_timer *timer = obj;
+
+	return timer->pipe[PIPE_READ];
+}
+
+/*!
+ * \note only PIPE_READ is guaranteed valid 
+ */
+static int pthread_timer_cmp(void *obj, void *arg, int flags)
+{
+	struct pthread_timer *timer1 = obj, *timer2 = arg;
+
+	return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0;
+}
+
+/*!
+ * \retval 0 no timer tick needed
+ * \retval non-zero write to the timing pipe needed
+ */
+static int check_timer(struct pthread_timer *timer)
+{
+	struct timeval now;
+
+	if (timer->state == TIMER_STATE_IDLE || timer->state == TIMER_STATE_CONTINUOUS) {
+		return 0;	
+	}
+
+	now = ast_tvnow();
+
+	if (ast_tvdiff_ms(now, timer->last_tick) >= timer->interval) {
+		timer->last_tick = now;
+		return 1;
+	}
+
+	return 0;
+}
+
+static void read_pipe(int rd_fd, unsigned int quantity, int clear)
+{
+
+	ast_assert(quantity || clear);
+
+	if (!quantity && clear) {
+		quantity = 1;
+	}
+
 	do {
 		unsigned char buf[1024];
-
-		res = read(timer->pipe[PIPE_READ], buf, quantity);
+		ssize_t res;
+		fd_set rfds;
+		struct timeval tv = {
+			.tv_sec = 0,
+		};
+
+		/* Make sure there is data to read */
+		FD_ZERO(&rfds);
+		FD_SET(rd_fd, &rfds);
+
+		if (select(rd_fd + 1, &rfds, NULL, NULL, &tv) != 1) {
+			break;
+		}
+
+		res = read(rd_fd, buf, 
+			(quantity < sizeof(buf)) ? quantity : sizeof(buf));
 
 		if (res == -1) {
 			if (errno == EAGAIN) {
@@ -155,116 +340,114 @@
 			break;
 		}
 
+		if (clear) {
+			continue;
+		}
+
 		quantity -= res;
-
 	} while (quantity);
-
-	ao2_ref(timer, -1);
-}
-
-static int pthread_timer_enable_continuous(int handle)
-{
-	struct pthread_timer *timer;
-
-	if (!(timer = find_timer(handle, 0))) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	timer->state = TIMER_STATE_CONTINUOUS;
-
-	ao2_ref(timer, -1);
-
-	return 0;
-}
-
-static int pthread_timer_disable_continuous(int handle)
-{
-	struct pthread_timer *timer;
-
-	if (!(timer = find_timer(handle, 0))) {
-		errno = EINVAL;
-		return -1;
-	}
-
-	timer->state = timer->rate ? TIMER_STATE_TICKING : TIMER_STATE_IDLE;
-
-	ao2_ref(timer, -1);
-
-	return 0;
-}
-
-static enum ast_timing_event pthread_timer_get_event(int handle)
-{
-	struct pthread_timer *timer;
-	enum ast_timing_event res = AST_TIMING_EVENT_EXPIRED;
-
-	if (!(timer = find_timer(handle, 0))) {
-		return res;
-	}
-
-	if (timer->state == TIMER_STATE_CONTINUOUS) {
-		res = AST_TIMING_EVENT_CONTINUOUS;
-	}
-
-	ao2_ref(timer, -1);
-
-	return res;
-}
-
-static struct pthread_timer *find_timer(int handle, int unlink)
-{
-	struct pthread_timer *timer;
-	struct pthread_timer tmp_timer;
-	int flags = OBJ_POINTER;
-
-	tmp_timer.pipe[PIPE_READ] = handle;
-
-	if (unlink) {
-		flags |= OBJ_UNLINK;
-	}
-
-	if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
-		ast_assert(timer != NULL);
-		return NULL;
-	}
-
-	return timer;
-}
-
-static void pthread_timer_destructor(void *obj)
+}
+
+static void write_byte(int wr_fd)
+{
+	do {
+		ssize_t res;
+		unsigned char x = 42;
+
+		res = write(wr_fd, &x, 1); 
+
+		if (res == -1) {
+			if (errno == EAGAIN) {
+				continue;
+			}
+			ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n", strerror(errno));
+		}
+	} while (0);
+}
+
+static int run_timer(void *obj, void *arg, int flags)
 {
 	struct pthread_timer *timer = obj;
 
-	if (timer->pipe[PIPE_READ] > -1) {
-		close(timer->pipe[PIPE_READ]);
-		timer->pipe[PIPE_READ] = -1;
-	}
-
-	if (timer->pipe[PIPE_WRITE] > -1) {
-		close(timer->pipe[PIPE_WRITE]);
-		timer->pipe[PIPE_WRITE] = -1;
-	}
+	if (timer->state == TIMER_STATE_IDLE) {
+		return 0;
+	}
+
+	ao2_lock(timer);
+
+	if (check_timer(timer)) {
+		write_byte(timer->pipe[PIPE_WRITE]);
+	}
+	
+	ao2_unlock(timer);
+
+	return 0;
 }
 
 /*!
- * \note only PIPE_READ is guaranteed valid 
- */
-static int pthread_timer_hash(const void *obj, const int flags)
-{
-	const struct pthread_timer *timer = obj;
-
-	return timer->pipe[PIPE_READ];
-}
-
-/*!
- * \note only PIPE_READ is guaranteed valid 
- */
-static int pthread_timer_cmp(void *obj, void *arg, int flags)
-{
-	struct pthread_timer *timer1 = obj, *timer2 = arg;
-
-	return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH : 0;
+ * \brief Data for the timing thread
+ */
+static struct {
+	pthread_t thread;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	unsigned int stop:1;
+} timing_thread;
+
+static void *do_timing(void *arg)
+{
+	struct timeval base = ast_tvnow();
+	int skew = 0, previous_skew = 0, last_adjust = 0;
+
+	while (!timing_thread.stop) {
+		struct timeval next_wakeup, now;
+		struct timespec ts = { 0, };
+
+		ao2_callback(pthread_timers, 0, run_timer, NULL);
+
+		if (previous_skew && skew) {
+			int adjust = abs(skew - previous_skew), new_adjust = 0;
+
+			if (adjust > 0) {
+				new_adjust = abs(adjust - last_adjust);
+			}
+			last_adjust = new_adjust;
+
+			base = ast_tvadd(base, ast_tv(0, ((TIMING_INTERVAL - new_adjust) + TIMING_INTERVAL) * 1000));
+		} else {
+			base = ast_tvadd(base, ast_tv(0, (TIMING_INTERVAL * 2) * 1000));
+		}
+
+		now = ast_tvnow();
+		next_wakeup = ast_tvadd(now, ast_tvsub(base, now));
+
+		ts.tv_sec = next_wakeup.tv_sec;
+		ts.tv_nsec = next_wakeup.tv_usec * 1000;
+
+		ast_mutex_lock(&timing_thread.lock);
+		if (!timing_thread.stop) {
+			ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
+		}
+		ast_mutex_unlock(&timing_thread.lock);
+
+		previous_skew = skew;
+		skew = ast_tvdiff_ms(ast_tvnow(), next_wakeup);
+	}
+
+	return NULL;
+}
+
+static int init_timing_thread(void)
+{
+	ast_mutex_init(&timing_thread.lock);
+	ast_cond_init(&timing_thread.cond, NULL);
+
+	if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
+		ast_log(LOG_ERROR, "Unable to start timing thread.\n");
+		return -1;
+	}
+
+	return 0;
 }
 
 static int load_module(void)
@@ -274,6 +457,12 @@
 		return AST_MODULE_LOAD_DECLINE;
 	}
 
+	if (init_timing_thread()) {
+		ao2_ref(pthread_timers, -1);
+		pthread_timers = NULL;
+		return AST_MODULE_LOAD_DECLINE;
+	}
+
 	return (timing_funcs_handle = ast_install_timing_functions(&pthread_timing_functions)) ?
 		AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
 }
@@ -281,6 +470,8 @@
 static int unload_module(void)
 {
 #if 0
+	/* XXX code to stop the timing thread ... */
+
 	ast_uninstall_timing_functions(timing_funcs_handle);
 	ao2_ref(pthread_timers, -1);
 #endif




More information about the asterisk-commits mailing list