[asterisk-commits] tilghman: branch tilghman/sched-fu r137297 - /team/tilghman/sched-fu/main/

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Tue Aug 12 13:07:39 CDT 2008


Author: tilghman
Date: Tue Aug 12 13:07:38 2008
New Revision: 137297

URL: http://svn.digium.com/view/asterisk?view=rev&rev=137297
Log:
Inexact scheduler, but O(1) insert time

Modified:
    team/tilghman/sched-fu/main/sched.c

Modified: team/tilghman/sched-fu/main/sched.c
URL: http://svn.digium.com/view/asterisk/team/tilghman/sched-fu/main/sched.c?view=diff&rev=137297&r1=137296&r2=137297
==============================================================================
--- team/tilghman/sched-fu/main/sched.c (original)
+++ team/tilghman/sched-fu/main/sched.c Tue Aug 12 13:07:38 2008
@@ -45,9 +45,10 @@
 #include "asterisk/linkedlists.h"
 #include "asterisk/dlinkedlists.h"
 #include "asterisk/hashtab.h"
+#include "asterisk/astobj2.h"
 
 struct sched {
-	AST_DLLIST_ENTRY(sched) list;
+	AST_LIST_ENTRY(sched) list;
 	int id;                       /*!< ID number of event */
 	struct timeval when;          /*!< Absolute time event should take place */
 	int resched;                  /*!< When to reschedule */
@@ -57,19 +58,20 @@
 };
 
 struct sched_context {
+	int last_executed;
 	ast_mutex_t lock;
 	unsigned int eventcnt;                  /*!< Number of events processed */
 	unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
 	unsigned int highwater;					/*!< highest count so far */
-	AST_DLLIST_HEAD_NOLOCK(, sched) schedq;   /*!< Schedule entry and main queue */
+	struct ao2_container *sched[600];
 	struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
-
 #ifdef SCHED_MAX_CACHE
 	AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
 	unsigned int schedccnt;
 #endif
 };
 
+#define CALC_SLOT(a)	(((a).tv_sec % 60) * 10 + ((a).tv_usec / 100000))
 
 /* hash routines for sched */
 
@@ -79,7 +81,7 @@
 	const struct sched *bs = b;
 	return as->id != bs->id; /* return 0 on a match like strcmp would */
 }
-
+ 
 static unsigned int sched_hash(const void *obj)
 {
 	const struct sched *s = obj;
@@ -87,6 +89,15 @@
 	return h;
 }
 
+static int null_hash_fn(const void *obj, const int flags)
+{
+	return 0;
+}
+
+static void sched_destructor(void *data)
+{
+}
+
 struct sched_context *sched_context_create(void)
 {
 	struct sched_context *tmp;
@@ -94,36 +105,35 @@
 	if (!(tmp = ast_calloc(1, sizeof(*tmp))))
 		return NULL;
 
-	ast_mutex_init(&tmp->lock);
-	tmp->eventcnt = 1;
-	
 	tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
-	
+	tmp->last_executed = -1;
+
 	return tmp;
 }
 
 void sched_context_destroy(struct sched_context *con)
 {
 	struct sched *s;
-
-	ast_mutex_lock(&con->lock);
+	int i;
 
 #ifdef SCHED_MAX_CACHE
 	/* Eliminate the cache */
 	while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
-		ast_free(s);
+		ao2_ref(s, -1);
 #endif
 
 	/* And the queue */
-	while ((s = AST_DLLIST_REMOVE_HEAD(&con->schedq, list)))
-		ast_free(s);
+	for (i = 0; i < 600; i++) {
+		if (con->sched[i]) {
+			ao2_ref(con->sched[i], -1);
+			con->sched[i] = NULL;
+		}
+	}
 
 	ast_hashtab_destroy(con->schedq_ht, NULL);
 	con->schedq_ht = NULL;
-	
+
 	/* And the context */
-	ast_mutex_unlock(&con->lock);
-	ast_mutex_destroy(&con->lock);
 	ast_free(con);
 }
 
@@ -140,7 +150,7 @@
 		con->schedccnt--;
 	else
 #endif
-		tmp = ast_calloc(1, sizeof(*tmp));
+		tmp = ao2_alloc(sizeof(*tmp), sched_destructor);
 
 	return tmp;
 }
@@ -158,7 +168,7 @@
 		con->schedccnt++;
 	} else
 #endif
-		ast_free(tmp);
+		ao2_ref(tmp, -1);
 }
 
 /*! \brief
@@ -167,19 +177,51 @@
  */
 int ast_sched_wait(struct sched_context *con)
 {
-	int ms;
+	int ms = INT_MAX, i;
+	struct timeval now = ast_tvnow();
+	int slot = CALC_SLOT(now);
+	struct ao2_iterator aoi;
+	struct sched *s;
 
 	DEBUG(ast_debug(1, "ast_sched_wait()\n"));
 
-	ast_mutex_lock(&con->lock);
-	if (AST_DLLIST_EMPTY(&con->schedq)) {
-		ms = -1;
+	if (slot == con->last_executed + 1) {
+		/* Haven't executed for 599 cycles? Search all. */
+		slot++;
+	}
+
+	if (con->last_executed == -1) {
+		/* first time through, search all */
+		for (i = 0; i < 600; i++) {
+			if (con->sched[i]) {
+				aoi = ao2_iterator_init(con->sched[i], 0);
+				while ((s = ao2_iterator_next(&aoi))) {
+					int tmp;
+					if ((tmp = ast_tvdiff_ms(s->when, now)) < ms) {
+						ms = tmp;
+					}
+					ao2_ref(s, -1);
+				}
+			}
+		}
 	} else {
-		ms = ast_tvdiff_ms(AST_DLLIST_FIRST(&con->schedq)->when, ast_tvnow());
-		if (ms < 0)
-			ms = 0;
-	}
-	ast_mutex_unlock(&con->lock);
+		for (i = con->last_executed; i != (slot == 599 ? 1 : slot + 1); i = (i == 599) ? 0 : i + 1) {
+			if (con->sched[i]) {
+				aoi = ao2_iterator_init(con->sched[i], 0);
+				while ((s = ao2_iterator_next(&aoi))) {
+					int tmp;
+					if ((tmp = ast_tvdiff_ms(s->when, now)) < ms) {
+						ms = tmp;
+					}
+					ao2_ref(s, -1);
+				}
+			}
+		}
+	}
+
+	if (ms < 0) {
+		ms = 0;
+	}
 
 	return ms;
 }
@@ -192,53 +234,15 @@
  */
 static void schedule(struct sched_context *con, struct sched *s)
 {
-	struct sched *cur = NULL;
-	int ret;
-	int df = 0;
-	int de = 0;
-	struct sched *first = AST_DLLIST_FIRST(&con->schedq);
-	struct sched *last = AST_DLLIST_LAST(&con->schedq);
-
-	if (first)
-		df = ast_tvdiff_us(s->when, first->when);
-	if (last)
-		de = ast_tvdiff_us(s->when, last->when);
-
-	if (df < 0)
-		df = -df;
-	if (de < 0)
-		de = -de;
-
-	if (df < de) {
-		AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
-			if (ast_tvcmp(s->when, cur->when) == -1) {
-				AST_DLLIST_INSERT_BEFORE(&con->schedq, cur, s, list);
-				break;
-			}
-		}
-		if (!cur) {
-			AST_DLLIST_INSERT_TAIL(&con->schedq, s, list);
-		}
-	} else {
-		AST_DLLIST_TRAVERSE_BACKWARDS(&con->schedq, cur, list) {
-			if (ast_tvcmp(s->when, cur->when) == 1) {
-				AST_DLLIST_INSERT_AFTER(&con->schedq, cur, s, list);
-				break;
-			}
-		}
-		if (!cur) {
-			AST_DLLIST_INSERT_HEAD(&con->schedq, s, list);
-		}
-	}
-
-	ret = ast_hashtab_insert_safe(con->schedq_ht, s);
-	if (!ret)
-		ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n",s->id);
-
-	con->schedcnt++;
-
-	if (con->schedcnt > con->highwater)
-		con->highwater = con->schedcnt;
+	int slot = CALC_SLOT(s->when);
+
+	if (!con->sched[slot]) {
+		con->sched[slot] = ao2_container_alloc(1, null_hash_fn, ao2_match_by_addr);
+	}
+	ao2_link(con->sched[slot], s);
+	if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
+		ast_log(LOG_WARNING, "Schedule Queue entry %d is already in table!\n", s->id);
+	}
 }
 
 /*! \brief
@@ -320,14 +324,12 @@
 
 const void *ast_sched_find_data(struct sched_context *con, int id)
 {
-	struct sched tmp,*res;
+	struct sched tmp, *res;
 	tmp.id = id;
 	res = ast_hashtab_lookup(con->schedq_ht, &tmp);
-	if (res)
-		return res->data;
-	return NULL;
-}
-	
+	return res ? res->data : NULL;
+}
+
 /*! \brief
  * Delete the schedule entry with number
  * "id".  It's nearly impossible that there
@@ -357,14 +359,12 @@
 	tmp.id = id;
 	s = ast_hashtab_lookup(con->schedq_ht, &tmp);
 	if (s) {
-		struct sched *x = AST_DLLIST_REMOVE(&con->schedq, s, list);
-		
-		if (!x)
-			ast_log(LOG_WARNING,"sched entry %d not in the schedq list?\n", s->id);
-
-		if (!ast_hashtab_remove_this_object(con->schedq_ht, s))
-			ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
-		con->schedcnt--;
+		int slot = CALC_SLOT(s->when);
+		ao2_unlink(con->sched[slot], s);
+
+		if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
+			ast_log(LOG_WARNING, "Found sched entry %d, then couldn't remove it?\n", s->id);
+		}
 		sched_release(con, s);
 	}
 	
@@ -388,7 +388,7 @@
 	return 0;
 }
 
-
+#if 0
 char *ast_sched_report(struct sched_context *con, char *buf, int bufsiz, struct ast_cb_names *cbnames)
 {
 	int *countlist,i;
@@ -398,8 +398,8 @@
 	
 	buf[0] = 0;
 	sprintf(buf, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
-	countlist = ast_calloc(sizeof(int),cbnames->numassocs+1);
-	
+	countlist = ast_calloc(sizeof(int), cbnames->numassocs + 1);
+
 	AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
 		/* match the callback to the cblist */
 		for (i=0;i<cbnames->numassocs;i++) {
@@ -421,7 +421,7 @@
 	strcat( buf, buf2);
 	return buf;
 }
-
+#endif
 
 	
 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
@@ -429,6 +429,8 @@
 {
 	struct sched *q;
 	struct timeval when = ast_tvnow();
+	struct ao2_iterator aoi;
+	int i;
 #ifdef SCHED_MAX_CACHE
 	ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
 #else
@@ -438,18 +440,20 @@
 	ast_debug(1, "=============================================================\n");
 	ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
 	ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
-	AST_DLLIST_TRAVERSE(&con->schedq, q, list) {
-		struct timeval delta = ast_tvsub(q->when, when);
-
-		ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
-			q->id,
-			q->callback,
-			q->data,
-			(long)delta.tv_sec,
-			(long int)delta.tv_usec);
+	for (i = 0; i < 600; i++) {
+		aoi = ao2_iterator_init(con->sched[i], 0);
+		while ((q = ao2_iterator_next(&aoi))) {
+			struct timeval delta = ast_tvsub(q->when, when);
+
+			ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
+				q->id,
+				q->callback,
+				q->data,
+				(long)delta.tv_sec,
+				(long int)delta.tv_usec);
+		}
 	}
 	ast_debug(1, "=============================================================\n");
-	
 }
 
 /*! \brief
@@ -458,71 +462,83 @@
 int ast_sched_runq(struct sched_context *con)
 {
 	struct sched *current;
-	struct timeval when;
-	int numevents;
-	int res;
+	struct timeval now = ast_tvnow();
+	int numevents = 0;
+	int res, i, start = 0, end = 600, slot = CALC_SLOT(now);
 
 	DEBUG(ast_debug(1, "ast_sched_runq()\n"));
 		
-	ast_mutex_lock(&con->lock);
-
-	for (numevents = 0; !AST_DLLIST_EMPTY(&con->schedq); numevents++) {
-		/* schedule all events which are going to expire within 1ms.
-		 * We only care about millisecond accuracy anyway, so this will
-		 * help us get more than one event at one time if they are very
-		 * close together.
-		 */
-		when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
-		if (ast_tvcmp(AST_DLLIST_FIRST(&con->schedq)->when, when) != -1)
-			break;
-		
-		current = AST_DLLIST_REMOVE_HEAD(&con->schedq, list);
-		if (!ast_hashtab_remove_this_object(con->schedq_ht, current))
-			ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
-
-		con->schedcnt--;
-
-		/*
-		 * At this point, the schedule queue is still intact.  We
-		 * have removed the first event and the rest is still there,
-		 * so it's permissible for the callback to add new events, but
-		 * trying to delete itself won't work because it isn't in
-		 * the schedule queue.  If that's what it wants to do, it 
-		 * should return 0.
-		 */
-			
-		ast_mutex_unlock(&con->lock);
-		res = current->callback(current->data);
-		ast_mutex_lock(&con->lock);
-			
-		if (res) {
-		 	/*
-			 * If they return non-zero, we should schedule them to be
-			 * run again.
-			 */
-			if (sched_settime(&current->when, current->variable? res : current->resched)) {
-				sched_release(con, current);
-			} else
-				schedule(con, current);
-		} else {
-			/* No longer needed, so release it */
-		 	sched_release(con, current);
-		}
-	}
-
-	ast_mutex_unlock(&con->lock);
-	
+	if (slot == con->last_executed + 1) {
+		/* Haven't executed for 599 cycles? Search all. */
+		slot++;
+	}
+
+	if (con->last_executed > -1) {
+		start = con->last_executed + 1;
+		end = CALC_SLOT(now);
+	}
+
+	for (i = start; i != end; i = (i == 599) ? 0 : i + 1) {
+		if (con->sched[i]) {
+			struct ao2_iterator aoi = ao2_iterator_init(con->sched[i], 0);
+			while ((current = ao2_iterator_next(&aoi))) {
+				int tmp;
+				/* schedule all events which are going to expire within 1ms.
+				 * We only care about millisecond accuracy anyway, so this will
+				 * help us get more than one event at one time if they are very
+				 * close together.
+				 */
+				if ((tmp = ast_tvdiff_ms(current->when, (now = ast_tvnow()))) < 1) {
+					numevents++;
+					ao2_unlink(con->sched[i], current);
+					if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
+						ast_log(LOG_ERROR, "Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
+					}
+					con->schedcnt--;
+
+					/*
+					 * At this point, the schedule queue is still intact.  We
+					 * have removed the first event and the rest is still there,
+					 * so it's permissible for the callback to add new events, but
+					 * trying to delete itself won't work because it isn't in
+					 * the schedule queue.  If that's what it wants to do, it 
+					 * should return 0.
+					 */
+					res = current->callback(current->data);
+
+					if (res) {
+					 	/*
+						 * If they return non-zero, we should schedule them to be
+						 * run again.
+						 */
+						if (sched_settime(&current->when, current->variable ? res : current->resched)) {
+							sched_release(con, current);
+						} else {
+							schedule(con, current);
+						}
+					} else {
+						/* No longer needed, so release it */
+					 	sched_release(con, current);
+					}
+				}
+				ao2_ref(current, -1);
+			}
+		}
+		/* Move the endposts, but don't end prematurely */
+		if (CALC_SLOT(now) != end) {
+			end = CALC_SLOT(now);
+		}
+	}
+
 	return numevents;
 }
 
-long ast_sched_when(struct sched_context *con,int id)
+long ast_sched_when(struct sched_context *con, int id)
 {
 	struct sched *s, tmp;
 	long secs = -1;
 	DEBUG(ast_debug(1, "ast_sched_when()\n"));
 
-	ast_mutex_lock(&con->lock);
-	
 	/* these next 2 lines replace a lookup loop */
 	tmp.id = id;
 	s = ast_hashtab_lookup(con->schedq_ht, &tmp);
@@ -531,7 +547,6 @@
 		struct timeval now = ast_tvnow();
 		secs = s->when.tv_sec - now.tv_sec;
 	}
-	ast_mutex_unlock(&con->lock);
-	
+
 	return secs;
 }




More information about the asterisk-commits mailing list