[asterisk-commits] mjordan: trunk r433575 - in /trunk: ./ res/res_timing_kqueue.c

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Mar 27 09:41:48 CDT 2015


Author: mjordan
Date: Fri Mar 27 09:41:46 2015
New Revision: 433575

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=433575
Log:
res/res_timing_kqueue: Update the module to conform to current timer API

This patch updates the kqueue timing module to conform to current timer API.

This fixes issues with using the kqueue timing source on Asterisk 13 on
FreeBSD 10. These issues include:

- Remove support for kevent64().  The values used to support Asterisk timers
  fit within 32bits and so can be handled on all platforms via kevent().

- Provide debug logging for, but do not track, unacked events.  This matches
  the behavior of all other timer implementations.

- Implement continuous mode by triggering and leaving active, a user event.
  This ensures that the file descriptor for the timer returns immediately from
  poll(), without placing the load of a high speed timer on the kernel.

- In kqueue_timer_get_max_rate(), don't overstate the capability of the timer.
  On some platforms, UINT_MAX is greater than INTPTR_MAX, the largest integer
  type kqueue supports for timers.

- In kqueue_timer_get_event(), assume the caller woke up from poll() and just
  return the mode the timer is currently in. This matches all other timer
  implementations.

- Adjust the test code now that unacked events are not tracked.

Review: https://reviewboard.asterisk.org/r/4465/

ASTERISK-24857 #close
Reported by: scsiguy
Tested by: Ed Hynan
patches:
  rb4465.patch submitted by scsiguy (License 6692)
........

Merged revisions 433574 from http://svn.asterisk.org/svn/asterisk/branches/13

Modified:
    trunk/   (props changed)
    trunk/res/res_timing_kqueue.c

Propchange: trunk/
------------------------------------------------------------------------------
Binary property 'branch-13-merged' - no diff available.

Modified: trunk/res/res_timing_kqueue.c
URL: http://svnview.digium.com/svn/asterisk/trunk/res/res_timing_kqueue.c?view=diff&rev=433575&r1=433574&r2=433575
==============================================================================
--- trunk/res/res_timing_kqueue.c (original)
+++ trunk/res/res_timing_kqueue.c Fri Mar 27 09:41:46 2015
@@ -73,15 +73,92 @@
 };
 
 struct kqueue_timer {
+	intptr_t period;
 	int handle;
-	uint64_t nsecs;
-	uint64_t unacked;
+#ifndef EVFILT_USER
+	int continuous_fd;
+	unsigned int continuous_fd_valid:1;
+#endif
 	unsigned int is_continuous:1;
 };
 
+#ifdef EVFILT_USER
+#define CONTINUOUS_EVFILT_TYPE EVFILT_USER
+static int kqueue_timer_init_continuous_event(struct kqueue_timer *timer)
+{
+	return 0;
+}
+
+static int kqueue_timer_enable_continuous_event(struct kqueue_timer *timer)
+{
+	struct kevent kev[2];
+
+	EV_SET(&kev[0], (uintptr_t)timer, EVFILT_USER, EV_ADD | EV_ENABLE,
+		0, 0, NULL);
+	EV_SET(&kev[1], (uintptr_t)timer, EVFILT_USER, 0, NOTE_TRIGGER,
+		0, NULL);
+	return kevent(timer->handle, kev, 2, NULL, 0, NULL);
+}
+
+static int kqueue_timer_disable_continuous_event(struct kqueue_timer *timer)
+{
+	struct kevent kev;
+
+	EV_SET(&kev, (uintptr_t)timer, EVFILT_USER, EV_DELETE, 0, 0, NULL);
+	return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+
+static void kqueue_timer_fini_continuous_event(struct kqueue_timer *timer)
+{
+}
+
+#else /* EVFILT_USER */
+
+#define CONTINUOUS_EVFILT_TYPE EVFILT_READ
+static int kqueue_timer_init_continuous_event(struct kqueue_timer *timer)
+{
+	int pipefds[2];
+	int retval;
+
+	retval = pipe(pipefds);
+	if (retval == 0) {
+		timer->continuous_fd = pipefds[0];
+		timer->continuous_fd_valid = 1;
+		close(pipefds[1]);
+	}
+	return retval;
+}
+
+static void kqueue_timer_fini_continuous_event(struct kqueue_timer *timer)
+{
+	if (timer->continuous_fd_valid) {
+		close(timer->continuous_fd);
+	}
+}
+
+static int kqueue_timer_enable_continuous_event(struct kqueue_timer *timer)
+{
+	struct kevent kev;
+
+	EV_SET(&kev, timer->continuous_fd, EVFILT_READ, EV_ADD | EV_ENABLE,
+		0, 0, NULL);
+	return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+
+static int kqueue_timer_disable_continuous_event(struct kqueue_timer *timer)
+{
+	struct kevent kev;
+
+	EV_SET(&kev, timer->continuous_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+	return kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+}
+#endif
+
 static void timer_destroy(void *obj)
 {
 	struct kqueue_timer *timer = obj;
+	ast_debug(5, "[%d]: Timer Destroy\n", timer->handle);
+	kqueue_timer_fini_continuous_event(timer);
 	close(timer->handle);
 }
 
@@ -90,15 +167,24 @@
 	struct kqueue_timer *timer;
 
 	if (!(timer = ao2_alloc(sizeof(*timer), timer_destroy))) {
-		ast_log(LOG_ERROR, "Could not allocate memory for kqueue_timer structure\n");
+		ast_log(LOG_ERROR, "Alloc failed for kqueue_timer structure\n");
 		return NULL;
 	}
+
 	if ((timer->handle = kqueue()) < 0) {
-		ast_log(LOG_ERROR, "Failed to create kqueue timer: %s\n", strerror(errno));
+		ast_log(LOG_ERROR, "Failed to create kqueue fd: %s\n",
+			strerror(errno));
 		ao2_ref(timer, -1);
 		return NULL;
 	}
 
+	if (kqueue_timer_init_continuous_event(timer) != 0) {
+		ast_log(LOG_ERROR, "Failed to create continuous event: %s\n",
+			strerror(errno));
+		ao2_ref(timer, -1);
+		return NULL;
+	}
+	ast_debug(5, "[%d]: Create timer\n", timer->handle);
 	return timer;
 }
 
@@ -106,108 +192,187 @@
 {
 	struct kqueue_timer *timer = data;
 
+	ast_debug(5, "[%d]: Timer Close\n", timer->handle);
 	ao2_ref(timer, -1);
 }
 
-static void kqueue_set_nsecs(struct kqueue_timer *our_timer, uint64_t nsecs)
-{
-	struct timespec nowait = { 0, 1 };
-#ifdef HAVE_KEVENT64
-	struct kevent64_s kev;
-
-	EV_SET64(&kev, our_timer->handle, EVFILT_TIMER, EV_ADD | EV_ENABLE, NOTE_NSECONDS,
-		nsecs, 0, 0, 0);
-	kevent64(our_timer->handle, &kev, 1, NULL, 0, 0, &nowait);
-#else
+/*
+ * Use the highest precision available that does not overflow
+ * the datatype kevent is using for time.
+ */
+static intptr_t kqueue_scale_period(unsigned int period_ns, int *units)
+{
+	uint64_t period = period_ns;
+	*units = 0;
+#ifdef NOTE_NSECONDS
+	if (period < INTPTR_MAX) {
+		*units = NOTE_NSECONDS;
+	} else {
+#ifdef NOTE_USECONDS
+		period /= 1000;
+		if (period < INTPTR_MAX) {
+			*units = NOTE_USECONDS;
+		} else {
+			period /= 1000;
+#ifdef NOTE_MSECONDS
+			*units = NOTE_MSECONDS;
+#endif	/* NOTE_MSECONDS */
+		}
+#else	/* NOTE_USECONDS */
+		period /= 1000000;
+#ifdef NOTE_MSECONDS
+		*units = NOTE_MSECONDS;
+#endif	/* NOTE_MSECONDS */
+#endif	/* NOTE_USECONDS */
+	}
+#else	/* NOTE_NSECONDS */
+	period /= 1000000;
+#endif
+	if (period > INTPTR_MAX) {
+		period = INTPTR_MAX;
+	}
+	return period;
+}
+
+static int kqueue_timer_set_rate(void *data, unsigned int rate)
+{
 	struct kevent kev;
-
-	EV_SET(&kev, our_timer->handle, EVFILT_TIMER, EV_ADD | EV_ENABLE,
-#ifdef NOTE_NSECONDS
-		nsecs <= 0xFFffFFff ? NOTE_NSECONDS :
-#endif
-#ifdef NOTE_USECONDS
-		NOTE_USECONDS
-#else /* Milliseconds, if no constants are defined */
-		0
-#endif
-		,
-#ifdef NOTE_NSECONDS
-		nsecs <= 0xFFffFFff ? nsecs :
-#endif
-#ifdef NOTE_USECONDS
-	nsecs / 1000
-#else /* Milliseconds, if nothing else is defined */
-	nsecs / 1000000
-#endif
-	, NULL);
-	kevent(our_timer->handle, &kev, 1, NULL, 0, &nowait);
-#endif
-}
-
-static int kqueue_timer_set_rate(void *data, unsigned int rate)
-{
-	struct kqueue_timer *timer = data;
-
-	kqueue_set_nsecs(timer, (timer->nsecs = rate ? (long) (1000000000 / rate) : 0L));
+	struct kqueue_timer *timer = data;
+	uint64_t period_ns;
+	int flags;
+	int units;
+	int retval;
+
+	ao2_lock(timer);
+
+	if (rate == 0) {
+		if (timer->period == 0) {
+			ao2_unlock(timer);
+			return (0);
+		}
+		flags = EV_DELETE;
+		timer->period = 0;
+		units = 0;
+	} else  {
+		flags = EV_ADD | EV_ENABLE;
+		period_ns = (uint64_t)1000000000 / rate;
+		timer->period = kqueue_scale_period(period_ns, &units);
+	}
+	ast_debug(5, "[%d]: Set rate %u:%ju\n",
+		timer->handle, units, (uintmax_t)timer->period);
+	EV_SET(&kev, timer->handle, EVFILT_TIMER, flags, units,
+		timer->period, NULL);
+	retval = kevent(timer->handle, &kev, 1, NULL, 0, NULL);
+
+	if (retval == -1) {
+		ast_log(LOG_ERROR, "[%d]: Error queing timer: %s\n",
+			timer->handle, strerror(errno));
+	}
+
+	ao2_unlock(timer);
 
 	return 0;
 }
 
 static int kqueue_timer_ack(void *data, unsigned int quantity)
 {
-	struct kqueue_timer *timer = data;
-
-	if (timer->unacked < quantity) {
-		ast_debug(1, "Acking more events than have expired?!!\n");
-		timer->unacked = 0;
+	static struct timespec ts_nowait = { 0, 0 };
+	struct kqueue_timer *timer = data;
+	struct kevent kev[2];
+	int i, retval;
+
+	ao2_lock(timer);
+
+	retval = kevent(timer->handle, NULL, 0, kev, 2, &ts_nowait);
+	if (retval == -1) {
+		ast_log(LOG_ERROR, "[%d]: Error sampling kqueue: %s\n",
+			timer->handle, strerror(errno));
+		ao2_unlock(timer);
 		return -1;
+	}
+
+	for (i = 0; i < retval; i++) {
+		switch (kev[i].filter) {
+		case EVFILT_TIMER:
+			if (kev[i].data > quantity) {
+				ast_log(LOG_ERROR, "[%d]: Missed %ju\n",
+					timer->handle,
+					(uintmax_t)kev[i].data - quantity);
+			}
+			break;
+		case CONTINUOUS_EVFILT_TYPE:
+			if (!timer->is_continuous) {
+				ast_log(LOG_ERROR,
+					"[%d]: Spurious user event\n",
+					timer->handle);
+			}
+			break;
+		default:
+			ast_log(LOG_ERROR, "[%d]: Spurious kevent type %d.\n",
+				timer->handle, kev[i].filter);
+		}
+	}
+
+	ao2_unlock(timer);
+
+	return 0;
+}
+
+static int kqueue_timer_enable_continuous(void *data)
+{
+	struct kqueue_timer *timer = data;
+	int retval;
+
+	ao2_lock(timer);
+
+	if (!timer->is_continuous) {
+		ast_debug(5, "[%d]: Enable Continuous\n", timer->handle);
+		retval = kqueue_timer_enable_continuous_event(timer);
+		if (retval == -1) {
+			ast_log(LOG_ERROR,
+				"[%d]: Error signaling continuous event: %s\n",
+				timer->handle, strerror(errno));
+		}
+		timer->is_continuous = 1;
+	}
+
+	ao2_unlock(timer);
+
+	return 0;
+}
+
+static int kqueue_timer_disable_continuous(void *data)
+{
+	struct kqueue_timer *timer = data;
+	int retval;
+
+	ao2_lock(timer);
+
+	if (timer->is_continuous) {
+		ast_debug(5, "[%d]: Disable Continuous\n", timer->handle);
+		retval = kqueue_timer_disable_continuous_event(timer);
+		if (retval == -1) {
+			ast_log(LOG_ERROR,
+				"[%d]: Error clearing continuous event: %s\n",
+				timer->handle, strerror(errno));
+		}
+		timer->is_continuous = 0;
+	}
+
+	ao2_unlock(timer);
+
+	return 0;
+}
+
+static enum ast_timer_event kqueue_timer_get_event(void *data)
+{
+	struct kqueue_timer *timer = data;
+	enum ast_timer_event res;
+
+	if (timer->is_continuous) {
+		res = AST_TIMING_EVENT_CONTINUOUS;
 	} else {
-		timer->unacked -= quantity;
-	}
-
-	return 0;
-}
-
-static int kqueue_timer_enable_continuous(void *data)
-{
-	struct kqueue_timer *timer = data;
-
-	kqueue_set_nsecs(timer, 1);
-	timer->is_continuous = 1;
-	timer->unacked = 0;
-
-	return 0;
-}
-
-static int kqueue_timer_disable_continuous(void *data)
-{
-	struct kqueue_timer *timer = data;
-
-	kqueue_set_nsecs(timer, timer->nsecs);
-	timer->is_continuous = 0;
-	timer->unacked = 0;
-
-	return 0;
-}
-
-static enum ast_timer_event kqueue_timer_get_event(void *data)
-{
-	struct kqueue_timer *timer = data;
-	enum ast_timer_event res = -1;
-	struct timespec sixty_seconds = { 60, 0 };
-	struct kevent kev;
-
-	/* If we have non-ACKed events, just return immediately */
-	if (timer->unacked == 0) {
-		if (kevent(timer->handle, NULL, 0, &kev, 1, &sixty_seconds) > 0) {
-			timer->unacked += kev.data;
-		} else {
-			perror("kevent");
-		}
-	}
-
-	if (timer->unacked > 0) {
-		res = timer->is_continuous ? AST_TIMING_EVENT_CONTINUOUS : AST_TIMING_EVENT_EXPIRED;
+		res = AST_TIMING_EVENT_EXPIRED;
 	}
 
 	return res;
@@ -215,8 +380,7 @@
 
 static unsigned int kqueue_timer_get_max_rate(void *data)
 {
-	/* Actually, the max rate is 2^64-1 seconds, but that's not representable in a 32-bit integer. */
-	return UINT_MAX;
+	return INTPTR_MAX > UINT_MAX ? UINT_MAX : INTPTR_MAX;
 }
 
 static int kqueue_timer_fd(void *data)
@@ -273,8 +437,8 @@
 			res = AST_TEST_FAIL;
 			break;
 		}
-		if (kt->unacked == 0) {
-			ast_test_status_update(test, "Unacked events is 0, but there should be at least 1.\n");
+		if (kqueue_timer_ack(kt, 1) != 0) {
+			ast_test_status_update(test, "Acking event failed.\n");
 			res = AST_TEST_FAIL;
 			break;
 		}
@@ -292,15 +456,15 @@
 				res = AST_TEST_FAIL;
 				break;
 			}
+			if (kqueue_timer_ack(kt, 1) != 0) {
+				ast_test_status_update(test, "Acking event failed.\n");
+				res = AST_TEST_FAIL;
+				break;
+			}
+
 		}
 		diff = ast_tvdiff_us(ast_tvnow(), start);
 		ast_test_status_update(test, "diff is %llu\n", diff);
-		/*
-		if (abs(diff - kt->unacked) == 0) {
-			ast_test_status_update(test, "Unacked events should be around 1000, not %llu\n", kt->unacked);
-			res = AST_TEST_FAIL;
-		}
-		*/
 	} while (0);
 	kqueue_timer_close(kt);
 	return res;
@@ -313,8 +477,8 @@
  * Module loading including tests for configuration or dependencies.
  * This function can return AST_MODULE_LOAD_FAILURE, AST_MODULE_LOAD_DECLINE,
  * or AST_MODULE_LOAD_SUCCESS. If a dependency or environment variable fails
- * tests return AST_MODULE_LOAD_FAILURE. If the module can not load the 
- * configuration file or other non-critical problem return 
+ * tests return AST_MODULE_LOAD_FAILURE. If the module can not load the
+ * configuration file or other non-critical problem return
  * AST_MODULE_LOAD_DECLINE. On success return AST_MODULE_LOAD_SUCCESS.
  */
 static int load_module(void)




More information about the asterisk-commits mailing list