[asterisk-commits] tilghman: branch tilghman/sched-fu r137297 - /team/tilghman/sched-fu/main/
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Tue Aug 12 13:07:39 CDT 2008
Author: tilghman
Date: Tue Aug 12 13:07:38 2008
New Revision: 137297
URL: http://svn.digium.com/view/asterisk?view=rev&rev=137297
Log:
Inexact scheduler, but O(1) insert time
Modified:
team/tilghman/sched-fu/main/sched.c
Modified: team/tilghman/sched-fu/main/sched.c
URL: http://svn.digium.com/view/asterisk/team/tilghman/sched-fu/main/sched.c?view=diff&rev=137297&r1=137296&r2=137297
==============================================================================
--- team/tilghman/sched-fu/main/sched.c (original)
+++ team/tilghman/sched-fu/main/sched.c Tue Aug 12 13:07:38 2008
@@ -45,9 +45,10 @@
#include "asterisk/linkedlists.h"
#include "asterisk/dlinkedlists.h"
#include "asterisk/hashtab.h"
+#include "asterisk/astobj2.h"
struct sched {
- AST_DLLIST_ENTRY(sched) list;
+ AST_LIST_ENTRY(sched) list;
int id; /*!< ID number of event */
struct timeval when; /*!< Absolute time event should take place */
int resched; /*!< When to reschedule */
@@ -57,19 +58,20 @@
};
struct sched_context {
+ int last_executed;
ast_mutex_t lock;
unsigned int eventcnt; /*!< Number of events processed */
unsigned int schedcnt; /*!< Number of outstanding schedule events */
unsigned int highwater; /*!< highest count so far */
- AST_DLLIST_HEAD_NOLOCK(, sched) schedq; /*!< Schedule entry and main queue */
+ struct ao2_container *sched[600];
struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */
-
#ifdef SCHED_MAX_CACHE
AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
unsigned int schedccnt;
#endif
};
+#define CALC_SLOT(a) (((a).tv_sec % 60) * 10 + ((a).tv_usec / 100000))
/* hash routines for sched */
@@ -79,7 +81,7 @@
const struct sched *bs = b;
return as->id != bs->id; /* return 0 on a match like strcmp would */
}
-
+
static unsigned int sched_hash(const void *obj)
{
const struct sched *s = obj;
@@ -87,6 +89,15 @@
return h;
}
+static int null_hash_fn(const void *obj, const int flags)
+{
+ return 0;
+}
+
+static void sched_destructor(void *data)
+{
+}
+
struct sched_context *sched_context_create(void)
{
struct sched_context *tmp;
@@ -94,36 +105,35 @@
if (!(tmp = ast_calloc(1, sizeof(*tmp))))
return NULL;
- ast_mutex_init(&tmp->lock);
- tmp->eventcnt = 1;
-
tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
-
+ tmp->last_executed = -1;
+
return tmp;
}
void sched_context_destroy(struct sched_context *con)
{
struct sched *s;
-
- ast_mutex_lock(&con->lock);
+ int i;
#ifdef SCHED_MAX_CACHE
/* Eliminate the cache */
while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
- ast_free(s);
+ ao2_ref(s, -1);
#endif
/* And the queue */
- while ((s = AST_DLLIST_REMOVE_HEAD(&con->schedq, list)))
- ast_free(s);
+ for (i = 0; i < 600; i++) {
+ if (con->sched[i]) {
+ ao2_ref(con->sched[i], -1);
+ con->sched[i] = NULL;
+ }
+ }
ast_hashtab_destroy(con->schedq_ht, NULL);
con->schedq_ht = NULL;
-
+
/* And the context */
- ast_mutex_unlock(&con->lock);
- ast_mutex_destroy(&con->lock);
ast_free(con);
}
@@ -140,7 +150,7 @@
con->schedccnt--;
else
#endif
- tmp = ast_calloc(1, sizeof(*tmp));
+ tmp = ao2_alloc(sizeof(*tmp), sched_destructor);
return tmp;
}
@@ -158,7 +168,7 @@
con->schedccnt++;
} else
#endif
- ast_free(tmp);
+ ao2_ref(tmp, -1);
}
/*! \brief
@@ -167,19 +177,51 @@
*/
int ast_sched_wait(struct sched_context *con)
{
- int ms;
+ int ms = INT_MAX, i;
+ struct timeval now = ast_tvnow();
+ int slot = CALC_SLOT(now);
+ struct ao2_iterator aoi;
+ struct sched *s;
DEBUG(ast_debug(1, "ast_sched_wait()\n"));
- ast_mutex_lock(&con->lock);
- if (AST_DLLIST_EMPTY(&con->schedq)) {
- ms = -1;
+ if (slot == con->last_executed + 1) {
+ /* Haven't executed for 599 cycles? Search all. */
+ slot++;
+ }
+
+ if (con->last_executed == -1) {
+ /* first time through, search all */
+ for (i = 0; i < 600; i++) {
+ if (con->sched[i]) {
+ aoi = ao2_iterator_init(con->sched[i], 0);
+ while ((s = ao2_iterator_next(&aoi))) {
+ int tmp;
+ if ((tmp = ast_tvdiff_ms(s->when, now)) < ms) {
+ ms = tmp;
+ }
+ ao2_ref(s, -1);
+ }
+ }
+ }
} else {
- ms = ast_tvdiff_ms(AST_DLLIST_FIRST(&con->schedq)->when, ast_tvnow());
- if (ms < 0)
- ms = 0;
- }
- ast_mutex_unlock(&con->lock);
+ for (i = con->last_executed; i != (slot == 599 ? 1 : slot + 1); i = (i == 599) ? 0 : i + 1) {
+ if (con->sched[i]) {
+ aoi = ao2_iterator_init(con->sched[i], 0);
+ while ((s = ao2_iterator_next(&aoi))) {
+ int tmp;
+ if ((tmp = ast_tvdiff_ms(s->when, now)) < ms) {
+ ms = tmp;
+ }
+ ao2_ref(s, -1);
+ }
+ }
+ }
+ }
+
+ if (ms < 0) {
+ ms = 0;
+ }
return ms;
}
@@ -192,53 +234,15 @@
*/
static void schedule(struct sched_context *con, struct sched *s)
{
- struct sched *cur = NULL;
- int ret;
- int df = 0;
- int de = 0;
- struct sched *first = AST_DLLIST_FIRST(&con->schedq);
- struct sched *last = AST_DLLIST_LAST(&con->schedq);
-
- if (first)
- df = ast_tvdiff_us(s->when, first->when);
- if (last)
- de = ast_tvdiff_us(s->when, last->when);
-
- if (df < 0)
- df = -df;
- if (de < 0)
- de = -de;
-
- if (df < de) {
- AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
- if (ast_tvcmp(s->when, cur->when) == -1) {
- AST_DLLIST_INSERT_BEFORE(&con->schedq, cur, s, list);
- break;
- }
- }
- if (!cur) {
- AST_DLLIST_INSERT_TAIL(&con->schedq, s, list);
- }
- } else {
- AST_DLLIST_TRAVERSE_BACKWARDS(&con->schedq, cur, list) {
- if (ast_tvcmp(s->when, cur->when) == 1) {
- AST_DLLIST_INSERT_AFTER(&con->schedq, cur, s, list);
- break;
- }
- }
- if (!cur) {
- AST_DLLIST_INSERT_HEAD(&con->schedq, s, list);
- }
- }
-
- ret = ast_hashtab_insert_safe(con->schedq_ht, s);
- if (!ret)
- ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n",s->id);
-
- con->schedcnt++;
-
- if (con->schedcnt > con->highwater)
- con->highwater = con->schedcnt;
+ int slot = CALC_SLOT(s->when);
+
+ if (!con->sched[slot]) {
+ con->sched[slot] = ao2_container_alloc(1, null_hash_fn, ao2_match_by_addr);
+ }
+ ao2_link(con->sched[slot], s);
+ if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
+ ast_log(LOG_WARNING, "Schedule Queue entry %d is already in table!\n", s->id);
+ }
}
/*! \brief
@@ -320,14 +324,12 @@
const void *ast_sched_find_data(struct sched_context *con, int id)
{
- struct sched tmp,*res;
+ struct sched tmp, *res;
tmp.id = id;
res = ast_hashtab_lookup(con->schedq_ht, &tmp);
- if (res)
- return res->data;
- return NULL;
-}
-
+ return res ? res->data : NULL;
+}
+
/*! \brief
* Delete the schedule entry with number
* "id". It's nearly impossible that there
@@ -357,14 +359,12 @@
tmp.id = id;
s = ast_hashtab_lookup(con->schedq_ht, &tmp);
if (s) {
- struct sched *x = AST_DLLIST_REMOVE(&con->schedq, s, list);
-
- if (!x)
- ast_log(LOG_WARNING,"sched entry %d not in the schedq list?\n", s->id);
-
- if (!ast_hashtab_remove_this_object(con->schedq_ht, s))
- ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
- con->schedcnt--;
+ int slot = CALC_SLOT(s->when);
+ ao2_unlink(con->sched[slot], s);
+
+ if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
+ ast_log(LOG_WARNING, "Found sched entry %d, then couldn't remove it?\n", s->id);
+ }
sched_release(con, s);
}
@@ -388,7 +388,7 @@
return 0;
}
-
+#if 0
char *ast_sched_report(struct sched_context *con, char *buf, int bufsiz, struct ast_cb_names *cbnames)
{
int *countlist,i;
@@ -398,8 +398,8 @@
buf[0] = 0;
sprintf(buf, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
- countlist = ast_calloc(sizeof(int),cbnames->numassocs+1);
-
+ countlist = ast_calloc(sizeof(int), cbnames->numassocs + 1);
+
AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
/* match the callback to the cblist */
for (i=0;i<cbnames->numassocs;i++) {
@@ -421,7 +421,7 @@
strcat( buf, buf2);
return buf;
}
-
+#endif
/*! \brief Dump the contents of the scheduler to LOG_DEBUG */
@@ -429,6 +429,8 @@
{
struct sched *q;
struct timeval when = ast_tvnow();
+ struct ao2_iterator aoi;
+ int i;
#ifdef SCHED_MAX_CACHE
ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
#else
@@ -438,18 +440,20 @@
ast_debug(1, "=============================================================\n");
ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
- AST_DLLIST_TRAVERSE(&con->schedq, q, list) {
- struct timeval delta = ast_tvsub(q->when, when);
-
- ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
- q->id,
- q->callback,
- q->data,
- (long)delta.tv_sec,
- (long int)delta.tv_usec);
+ for (i = 0; i < 600; i++) {
+ aoi = ao2_iterator_init(con->sched[i], 0);
+ while ((q = ao2_iterator_next(&aoi))) {
+ struct timeval delta = ast_tvsub(q->when, when);
+
+ ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
+ q->id,
+ q->callback,
+ q->data,
+ (long)delta.tv_sec,
+ (long int)delta.tv_usec);
+ }
}
ast_debug(1, "=============================================================\n");
-
}
/*! \brief
@@ -458,71 +462,83 @@
int ast_sched_runq(struct sched_context *con)
{
struct sched *current;
- struct timeval when;
- int numevents;
- int res;
+ struct timeval now = ast_tvnow();
+ int numevents = 0;
+ int res, i, start = 0, end = 600, slot = CALC_SLOT(now);
DEBUG(ast_debug(1, "ast_sched_runq()\n"));
- ast_mutex_lock(&con->lock);
-
- for (numevents = 0; !AST_DLLIST_EMPTY(&con->schedq); numevents++) {
- /* schedule all events which are going to expire within 1ms.
- * We only care about millisecond accuracy anyway, so this will
- * help us get more than one event at one time if they are very
- * close together.
- */
- when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
- if (ast_tvcmp(AST_DLLIST_FIRST(&con->schedq)->when, when) != -1)
- break;
-
- current = AST_DLLIST_REMOVE_HEAD(&con->schedq, list);
- if (!ast_hashtab_remove_this_object(con->schedq_ht, current))
- ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
-
- con->schedcnt--;
-
- /*
- * At this point, the schedule queue is still intact. We
- * have removed the first event and the rest is still there,
- * so it's permissible for the callback to add new events, but
- * trying to delete itself won't work because it isn't in
- * the schedule queue. If that's what it wants to do, it
- * should return 0.
- */
-
- ast_mutex_unlock(&con->lock);
- res = current->callback(current->data);
- ast_mutex_lock(&con->lock);
-
- if (res) {
- /*
- * If they return non-zero, we should schedule them to be
- * run again.
- */
- if (sched_settime(¤t->when, current->variable? res : current->resched)) {
- sched_release(con, current);
- } else
- schedule(con, current);
- } else {
- /* No longer needed, so release it */
- sched_release(con, current);
- }
- }
-
- ast_mutex_unlock(&con->lock);
-
+ if (slot == con->last_executed + 1) {
+ /* Haven't executed for 599 cycles? Search all. */
+ slot++;
+ }
+
+ if (con->last_executed > -1) {
+ start = con->last_executed + 1;
+ end = CALC_SLOT(now);
+ }
+
+ for (i = start; i != end; i = (i == 599) ? 0 : i + 1) {
+ if (con->sched[i]) {
+ struct ao2_iterator aoi = ao2_iterator_init(con->sched[i], 0);
+ while ((current = ao2_iterator_next(&aoi))) {
+ int tmp;
+ /* schedule all events which are going to expire within 1ms.
+ * We only care about millisecond accuracy anyway, so this will
+ * help us get more than one event at one time if they are very
+ * close together.
+ */
+ if ((tmp = ast_tvdiff_ms(current->when, (now = ast_tvnow()))) < 1) {
+ numevents++;
+ ao2_unlink(con->sched[i], current);
+ if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
+ ast_log(LOG_ERROR, "Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
+ }
+ con->schedcnt--;
+
+ /*
+ * At this point, the schedule queue is still intact. We
+ * have removed the first event and the rest is still there,
+ * so it's permissible for the callback to add new events, but
+ * trying to delete itself won't work because it isn't in
+ * the schedule queue. If that's what it wants to do, it
+ * should return 0.
+ */
+ res = current->callback(current->data);
+
+ if (res) {
+ /*
+ * If they return non-zero, we should schedule them to be
+ * run again.
+ */
+ if (sched_settime(¤t->when, current->variable ? res : current->resched)) {
+ sched_release(con, current);
+ } else {
+ schedule(con, current);
+ }
+ } else {
+ /* No longer needed, so release it */
+ sched_release(con, current);
+ }
+ }
+ ao2_ref(current, -1);
+ }
+ }
+ /* Move the endposts, but don't end prematurely */
+ if (CALC_SLOT(now) != end) {
+ end = CALC_SLOT(now);
+ }
+ }
+
return numevents;
}
-long ast_sched_when(struct sched_context *con,int id)
+long ast_sched_when(struct sched_context *con, int id)
{
struct sched *s, tmp;
long secs = -1;
DEBUG(ast_debug(1, "ast_sched_when()\n"));
- ast_mutex_lock(&con->lock);
-
/* these next 2 lines replace a lookup loop */
tmp.id = id;
s = ast_hashtab_lookup(con->schedq_ht, &tmp);
@@ -531,7 +547,6 @@
struct timeval now = ast_tvnow();
secs = s->when.tv_sec - now.tv_sec;
}
- ast_mutex_unlock(&con->lock);
-
+
return secs;
}
More information about the asterisk-commits
mailing list