[svn-commits] tilghman: trunk r255117 - /trunk/pbx/pbx_spool.c

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sat Mar 27 01:09:30 CDT 2010


Author: tilghman
Date: Sat Mar 27 01:09:26 2010
New Revision: 255117

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=255117
Log:
inotify support for pbx_spool

This should give a good speed boost, in that one particular thread isn't waking
up once a second to read directory contents.

Reviewboard: https://reviewboard.asterisk.org/r/137/

Modified:
    trunk/pbx/pbx_spool.c

Modified: trunk/pbx/pbx_spool.c
URL: http://svnview.digium.com/svn/asterisk/trunk/pbx/pbx_spool.c?view=diff&rev=255117&r1=255116&r2=255117
==============================================================================
--- trunk/pbx/pbx_spool.c (original)
+++ trunk/pbx/pbx_spool.c Sat Mar 27 01:09:26 2010
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2010, Digium, Inc.
  *
  * Mark Spencer <markster at digium.com>
  *
@@ -30,6 +30,9 @@
 #include <time.h>
 #include <utime.h>
 #include <dirent.h>
+#ifdef HAVE_INOTIFY
+#include <sys/inotify.h>
+#endif
 
 #include "asterisk/paths.h"	/* use ast_config_AST_SPOOL_DIR */
 #include "asterisk/lock.h"
@@ -107,7 +110,7 @@
 	ast_free(o);
 }
 
-static int apply_outgoing(struct outgoing *o, char *fn, FILE *f)
+static int apply_outgoing(struct outgoing *o, const char *fn, FILE *f)
 {
 	char buf[256];
 	char *c, *c2;
@@ -364,7 +367,7 @@
 	}
 }
 
-static int scan_service(char *fn, time_t now, time_t atime)
+static int scan_service(const char *fn, time_t now)
 {
 	struct outgoing *o = NULL;
 	FILE *f;
@@ -432,6 +435,141 @@
 	return res;
 }
 
+#ifdef HAVE_INOTIFY
+struct direntry {
+	AST_LIST_ENTRY(direntry) list;
+	time_t mtime;
+	char name[0];
+};
+
+/* Only one thread is accessing this list, so no lock is necessary */
+static AST_LIST_HEAD_NOLOCK_STATIC(dirlist, direntry);
+
+static void queue_file(const char *filename, time_t when)
+{
+	struct stat st;
+	struct direntry *cur, *new;
+	int res;
+	time_t now = time(NULL);
+
+	if (filename[0] != '/') {
+		char *fn = alloca(strlen(qdir) + strlen(filename) + 2);
+		sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
+		filename = fn;
+	}
+
+	if (when == 0) {
+		if (stat(filename, &st)) {
+			ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
+			return;
+		}
+
+		if (!S_ISREG(st.st_mode)) {
+			return;
+		}
+
+		when = st.st_mtime;
+	}
+
+	if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
+		if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
+			return;
+		}
+		new->mtime = res;
+		strcpy(new->name, filename);
+		/* List is ordered by mtime */
+		if (AST_LIST_EMPTY(&dirlist)) {
+			AST_LIST_INSERT_HEAD(&dirlist, new, list);
+		} else {
+			int found = 0;
+			AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
+				if (cur->mtime > new->mtime) {
+					AST_LIST_INSERT_BEFORE_CURRENT(new, list);
+					found = 1;
+					break;
+				}
+			}
+			AST_LIST_TRAVERSE_SAFE_END
+			if (!found) {
+				AST_LIST_INSERT_TAIL(&dirlist, new, list);
+			}
+		}
+	}
+}
+
+static void *scan_thread(void *unused)
+{
+	DIR *dir;
+	struct dirent *de;
+	int res;
+	time_t now;
+	struct timespec ts = { .tv_sec = 1 };
+	int inotify_fd = inotify_init();
+	struct direntry *cur;
+	struct {
+		struct inotify_event iev;
+		/* It may not look like we're using this element, but when we read
+		 * from inotify_fd, the event is typically larger than the first
+		 * struct, and overflows into this second one. */
+		char name[FILENAME_MAX + 1];
+	} buf;
+	struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
+
+	while (!ast_fully_booted) {
+		nanosleep(&ts, NULL);
+	}
+
+	if (inotify_fd < 0) {
+		ast_log(LOG_ERROR, "Unable to initialize inotify(7)\n");
+		return NULL;
+	}
+
+	inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_ATTRIB | IN_MOVED_TO);
+
+	/* First, run through the directory and clear existing entries */
+	if (!(dir = opendir(qdir))) {
+		ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
+		return NULL;
+	}
+
+	now = time(NULL);
+	while ((de = readdir(dir))) {
+		queue_file(de->d_name, 0);
+	}
+	closedir(dir);
+
+	/* Wait for either a) next timestamp to occur, or b) a change to happen */
+	for (;/* ever */;) {
+		time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
+
+		time(&now);
+		if (next > now) {
+			int stage = 0;
+			/* Convert from seconds to milliseconds, unless there's nothing
+			 * in the queue already, in which case, we wait forever. */
+			int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
+			/* When a file arrives, add it to the queue, in mtime order. */
+			if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
+				(res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(buf.iev)) {
+				/* File added to directory, add it to my list */
+				queue_file(buf.iev.name, 0);
+			} else if (res < 0 && errno != EINTR && errno != EAGAIN) {
+				ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
+			}
+			time(&now);
+		}
+
+		/* Empty the list of all entries ready to be processed */
+		while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
+			cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
+			queue_file(cur->name, cur->mtime);
+			ast_free(cur);
+		}
+	}
+	return NULL;
+}
+
+#else
 static void *scan_thread(void *unused)
 {
 	struct stat st;
@@ -481,7 +619,7 @@
 			if (!S_ISREG(st.st_mode))
 				continue;
 			if (st.st_mtime <= now) {
-				res = scan_service(fn, now, st.st_atime);
+				res = scan_service(fn, now);
 				if (res > 0) {
 					/* Update next service time */
 					if (!next || (res < next)) {
@@ -503,6 +641,7 @@
 	}
 	return NULL;
 }
+#endif
 
 static int unload_module(void)
 {




More information about the svn-commits mailing list