[asterisk-commits] russell: branch russell/sched_thread2 r170835 - in /team/russell/sched_thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Jan 23 23:29:34 CST 2009


Author: russell
Date: Fri Jan 23 23:29:34 2009
New Revision: 170835

URL: http://svn.digium.com/svn-view/asterisk?view=rev&rev=170835
Log:
add sched thread and chan_iax2 conversion.  need to document the changes to sched.h ...

Modified:
    team/russell/sched_thread2/channels/chan_iax2.c
    team/russell/sched_thread2/include/asterisk/sched.h
    team/russell/sched_thread2/main/sched.c

Modified: team/russell/sched_thread2/channels/chan_iax2.c
URL: http://svn.digium.com/svn-view/asterisk/team/russell/sched_thread2/channels/chan_iax2.c?view=diff&rev=170835&r1=170834&r2=170835
==============================================================================
--- team/russell/sched_thread2/channels/chan_iax2.c (original)
+++ team/russell/sched_thread2/channels/chan_iax2.c Fri Jan 23 23:29:34 2009
@@ -305,7 +305,7 @@
 	} while(0)
 
 static	struct io_context *io;
-static	struct sched_context *sched;
+static	struct ast_sched_thread *sched;
 
 static int iax2_capability = IAX_CAPABILITY_FULLBANDWIDTH;
 
@@ -332,9 +332,6 @@
 static struct ast_flags globalflags = { 0 };
 
 static pthread_t netthreadid = AST_PTHREADT_NULL;
-static pthread_t schedthreadid = AST_PTHREADT_NULL;
-AST_MUTEX_DEFINE_STATIC(sched_lock);
-static ast_cond_t sched_cond;
 
 enum iax2_state {
 	IAX_STATE_STARTED = 		(1 << 0),
@@ -1258,22 +1255,18 @@
 #define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
 #endif
 
-static int iax2_sched_replace(int id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
-{
-	AST_SCHED_REPLACE(id, con, when, callback, data);
-	signal_condition(&sched_lock, &sched_cond);
-
-	return id;
-}
-
-static int iax2_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
-{
-	int res;
-
-	res = ast_sched_add(con, when, callback, data);
-	signal_condition(&sched_lock, &sched_cond);
-
-	return res;
+static int iax2_sched_replace(int id, struct ast_sched_thread *st, int when, 
+		ast_sched_cb callback, const void *data)
+{
+	ast_sched_thread_del(st, id);
+
+	return ast_sched_thread_add(st, when, callback, data);
+}
+
+static int iax2_sched_add(struct ast_sched_thread *st, int when, 
+		ast_sched_cb callback, const void *data)
+{
+	return ast_sched_thread_add(st, when, callback, data);
 }
 
 static int send_ping(const void *data);
@@ -1522,18 +1515,18 @@
 		ast_clear_flag(pvt, IAX_MAXAUTHREQ);
 	}
 	/* No more pings or lagrq's */
-	AST_SCHED_DEL_SPINLOCK(sched, pvt->pingid, &iaxsl[pvt->callno]);
-	AST_SCHED_DEL_SPINLOCK(sched, pvt->lagid, &iaxsl[pvt->callno]);
-	AST_SCHED_DEL(sched, pvt->autoid);
-	AST_SCHED_DEL(sched, pvt->authid);
-	AST_SCHED_DEL(sched, pvt->initid);
-	AST_SCHED_DEL(sched, pvt->jbid);
-	AST_SCHED_DEL(sched, pvt->keyrotateid);
+	AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->pingid, &iaxsl[pvt->callno]);
+	AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->lagid, &iaxsl[pvt->callno]);
+	ast_sched_thread_del(sched, pvt->autoid);
+	ast_sched_thread_del(sched, pvt->authid);
+	ast_sched_thread_del(sched, pvt->initid);
+	ast_sched_thread_del(sched, pvt->jbid);
+	ast_sched_thread_del(sched, pvt->keyrotateid);
 }
 
 static void iax2_frame_free(struct iax_frame *fr)
 {
-	AST_SCHED_DEL(sched, fr->retrans);
+	ast_sched_thread_del(sched, fr->retrans);
 	iax_frame_free(fr);
 }
 
@@ -1722,8 +1715,8 @@
 			 * \note We delete these before switching the slot, because if
 			 * they fire in the meantime, they will generate a warning.
 			 */
-			AST_SCHED_DEL(sched, iaxs[callno]->pingid);
-			AST_SCHED_DEL(sched, iaxs[callno]->lagid);
+			ast_sched_thread_del(sched, iaxs[callno]->pingid);
+			ast_sched_thread_del(sched, iaxs[callno]->lagid);
 			iaxs[x] = iaxs[callno];
 			iaxs[x]->callno = x;
 			iaxs[callno] = NULL;
@@ -3189,7 +3182,7 @@
 
 		jb_reset(iaxs[fr->callno]->jb);
 
-		AST_SCHED_DEL(sched, iaxs[fr->callno]->jbid);
+		ast_sched_thread_del(sched, iaxs[fr->callno]->jbid);
 
 		/* deliver this frame now */
 		if (tsout)
@@ -3229,7 +3222,7 @@
 	/* Wake up the network and scheduler thread */
 	if (netthreadid != AST_PTHREADT_NULL)
 		pthread_kill(netthreadid, SIGURG);
-	signal_condition(&sched_lock, &sched_cond);
+	ast_sched_thread_poke(sched);
 	return 0;
 }
 
@@ -3365,7 +3358,7 @@
 		ast_copy_flags(peer, &globalflags, IAX_RTAUTOCLEAR|IAX_RTCACHEFRIENDS);
 		if (ast_test_flag(peer, IAX_RTAUTOCLEAR)) {
  			if (peer->expire > -1) {
- 				if (!ast_sched_del(sched, peer->expire)) {
+ 				if (!ast_sched_thread_del(sched, peer->expire)) {
  					peer->expire = -1;
  					peer_unref(peer);
  				}
@@ -3923,7 +3916,7 @@
 			ast_debug(1, "Really destroying %s now...\n", c->name);
 			iax2_destroy(callno);
 		} else if (iaxs[callno]) {
-			if (ast_sched_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) {
+			if (ast_sched_thread_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) {
 				ast_log(LOG_ERROR, "Unable to schedule iax2 callno %d destruction?!!  Destroying immediately.\n", callno);
 				iax2_destroy(callno);
 			}
@@ -4026,7 +4019,7 @@
 	ast_mutex_lock(&iaxsl[pvt->callno]);
 
 	pvt->keyrotateid = 
-		ast_sched_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt);
+		ast_sched_thread_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt);
 
 	snprintf(key, sizeof(key), "%lX", ast_random());
 
@@ -7077,14 +7070,14 @@
 static void unlink_peer(struct iax2_peer *peer)
 {
 	if (peer->expire > -1) {
-		if (!ast_sched_del(sched, peer->expire)) {
+		if (!ast_sched_thread_del(sched, peer->expire)) {
 			peer->expire = -1;
 			peer_unref(peer);
 		}
 	}
 
 	if (peer->pokeexpire > -1) {
-		if (!ast_sched_del(sched, peer->pokeexpire)) {
+		if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
 			peer->pokeexpire = -1;
 			peer_unref(peer);
 		}
@@ -7158,7 +7151,7 @@
 					p->addr.sin_addr = in;
 					p->addr.sin_port = htons(atoi(c));
  					if (p->expire > -1) {
- 						if (!ast_sched_del(sched, p->expire)) {
+ 						if (!ast_sched_thread_del(sched, p->expire)) {
  							p->expire = -1;
  							peer_unref(p);
  						}
@@ -7254,7 +7247,7 @@
 	p->sockfd = fd;
 	/* Setup the expiry */
 	if (p->expire > -1) {
-		if (!ast_sched_del(sched, p->expire)) {
+		if (!ast_sched_thread_del(sched, p->expire)) {
 			p->expire = -1;
 			peer_unref(p);
 		}
@@ -8740,7 +8733,7 @@
 			}
 		}
 		if (f.frametype == AST_FRAME_IAX) {
-			AST_SCHED_DEL(sched, iaxs[fr->callno]->initid);
+			ast_sched_thread_del(sched, iaxs[fr->callno]->initid);
 			/* Handle the IAX pseudo frame itself */
 			if (iaxdebug)
 				ast_debug(1, "IAX subclass %d received\n", f.subclass);
@@ -9227,7 +9220,7 @@
 
 					/* Remove scheduled iax2_poke_noanswer */
 					if (peer->pokeexpire > -1) {
-						if (!ast_sched_del(sched, peer->pokeexpire)) {
+						if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
 							peer_unref(peer);
 							peer->pokeexpire = -1;
 						}
@@ -10345,7 +10338,7 @@
 	iaxs[peer->callno]->peerpoke = peer;
 
  	if (peer->pokeexpire > -1) {
- 		if (!ast_sched_del(sched, peer->pokeexpire)) {
+ 		if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
  			peer->pokeexpire = -1;
  			peer_unref(peer);
  		}
@@ -10458,34 +10451,6 @@
 	}
 
 	return c;
-}
-
-static void *sched_thread(void *ignore)
-{
-	int count;
-	int res;
-	struct timeval wait;
-	struct timespec ts;
-
-	for (;;) {
-		pthread_testcancel();
-		ast_mutex_lock(&sched_lock);
-		res = ast_sched_wait(sched);
-		if ((res > 1000) || (res < 0))
-			res = 1000;
-		wait = ast_tvadd(ast_tvnow(), ast_samp2tv(res, 1000));
-		ts.tv_sec = wait.tv_sec;
-		ts.tv_nsec = wait.tv_usec * 1000;
-		ast_cond_timedwait(&sched_cond, &sched_lock, &ts);
-		ast_mutex_unlock(&sched_lock);
-		pthread_testcancel();
-
-		count = ast_sched_runq(sched);
-		if (count >= 20)
-			ast_debug(1, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
-	}
-
-	return NULL;
 }
 
 static void *network_thread(void *ignore)
@@ -10575,7 +10540,6 @@
 			AST_LIST_UNLOCK(&idle_list);
 		}
 	}
-	ast_pthread_create_background(&schedthreadid, NULL, sched_thread, NULL);
 	ast_pthread_create_background(&netthreadid, NULL, network_thread, NULL);
 	ast_verb(2, "%d helper threads started\n", threadcount);
 	return 0;
@@ -10848,7 +10812,7 @@
 					}
 				} else {
 					/* Non-dynamic.  Make sure we become that way if we're not */
-					AST_SCHED_DEL(sched, peer->expire);
+					ast_sched_thread_del(sched, peer->expire);
 					ast_clear_flag(peer, IAX_DYNAMIC);
 					if (ast_dnsmgr_lookup(v->value, &peer->addr, &peer->dnsmgr, srvlookup ? "_iax._udp" : NULL))
 						return peer_unref(peer);
@@ -11221,7 +11185,7 @@
 
 	AST_LIST_LOCK(&registrations);
 	while ((reg = AST_LIST_REMOVE_HEAD(&registrations, entry))) {
-		AST_SCHED_DEL(sched, reg->expire);
+		ast_sched_thread_del(sched, reg->expire);
 		if (reg->callno) {
 			int callno = reg->callno;
 			ast_mutex_lock(&iaxsl[callno]);
@@ -12404,21 +12368,13 @@
 	/* Cancel the network thread, close the net socket */
 	if (netthreadid != AST_PTHREADT_NULL) {
 		AST_LIST_LOCK(&frame_queue);
-		ast_mutex_lock(&sched_lock);
 		pthread_cancel(netthreadid);
-		ast_cond_signal(&sched_cond);
-		ast_mutex_unlock(&sched_lock);	/* Release the schedule lock resource */
 		AST_LIST_UNLOCK(&frame_queue);
 		pthread_join(netthreadid, NULL);
 	}
-	if (schedthreadid != AST_PTHREADT_NULL) {
-		ast_mutex_lock(&sched_lock);
-		pthread_cancel(schedthreadid);
-		ast_cond_signal(&sched_cond);
-		ast_mutex_unlock(&sched_lock);
-		pthread_join(schedthreadid, NULL);
-	}
-
+
+	sched = ast_sched_thread_destroy(sched);
+	
 	/* Call for all threads to halt */
 	AST_LIST_LOCK(&idle_list);
 	while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list)))
@@ -12456,7 +12412,6 @@
 	ast_channel_unregister(&iax2_tech);
 	delete_users();
 	iax_provision_unload();
-	sched_context_destroy(sched);
 	reload_firmware(1);
 
 	for (x = 0; x < ARRAY_LEN(iaxsl); x++) {
@@ -12548,23 +12503,21 @@
 		ast_mutex_init(&iaxsl[x]);
 	}
 
-	ast_cond_init(&sched_cond, NULL);
-
-	if (!(sched = sched_context_create())) {
-		ast_log(LOG_ERROR, "Failed to create scheduler context\n");
+	if (!(sched = ast_sched_thread_create())) {
+		ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
 	if (!(io = io_context_create())) {
 		ast_log(LOG_ERROR, "Failed to create I/O context\n");
-		sched_context_destroy(sched);
+		sched = ast_sched_thread_destroy(sched);
 		return AST_MODULE_LOAD_FAILURE;
 	}
 
 	if (!(netsock = ast_netsock_list_alloc())) {
 		ast_log(LOG_ERROR, "Failed to create netsock list\n");
 		io_context_destroy(io);
-		sched_context_destroy(sched);
+		sched = ast_sched_thread_destroy(sched);
 		return AST_MODULE_LOAD_FAILURE;
 	}
 	ast_netsock_init(netsock);
@@ -12573,7 +12526,7 @@
 	if (!outsock) {
 		ast_log(LOG_ERROR, "Could not allocate outsock list.\n");
 		io_context_destroy(io);
-		sched_context_destroy(sched);
+		sched = ast_sched_thread_destroy(sched);
 		return AST_MODULE_LOAD_FAILURE;
 	}
 	ast_netsock_init(outsock);

Modified: team/russell/sched_thread2/include/asterisk/sched.h
URL: http://svn.digium.com/svn-view/asterisk/team/russell/sched_thread2/include/asterisk/sched.h?view=diff&rev=170835&r1=170834&r2=170835
==============================================================================
--- team/russell/sched_thread2/include/asterisk/sched.h (original)
+++ team/russell/sched_thread2/include/asterisk/sched.h Fri Jan 23 23:29:34 2009
@@ -49,15 +49,17 @@
  * and not a copy of the value of the id.
  */
 #define AST_SCHED_DEL(sched, id) \
-	do { \
+	({ \
 		int _count = 0; \
-		while (id > -1 && ast_sched_del(sched, id) && ++_count < 10) { \
-			usleep(1); \
-		} \
-		if (_count == 10) \
-			ast_debug(3, "Unable to cancel schedule ID %d.\n", id); \
+		int _sched_res = -1; \
+		while (id > -1 && (_sched_res = ast_sched_del(sched, id)) && ++_count < 10) \
+			usleep(1); \
+		if (_count == 10 && option_debug > 2) { \
+			ast_log(LOG_DEBUG, "Unable to cancel schedule ID %d.\n", id); \
+		} \
 		id = -1; \
-	} while (0);
+		(_sched_res); \
+	})
 
 #define AST_SCHED_DEL_UNREF(sched, id, refcall)			\
 	do { \
@@ -282,6 +284,32 @@
 	} \
 } while(0)
 
+/*!
+ * \brief An opaque type representing a scheduler thread
+ */
+struct ast_sched_thread;
+
+struct ast_sched_thread *ast_sched_thread_create(void);
+
+struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st);
+
+int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+		const void *data);
+
+int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+		const void *data, int variable);
+
+struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st);
+
+#define ast_sched_thread_del(st, id) ({ \
+	struct sched_context *__tmp_context = ast_sched_thread_get_context(st); \
+	AST_SCHED_DEL(__tmp_context, id); \
+})
+
+long ast_sched_thread_when(struct ast_sched_thread *st, int id);
+
+void ast_sched_thread_poke(struct ast_sched_thread *st);
+
 #if defined(__cplusplus) || defined(c_plusplus)
 }
 #endif

Modified: team/russell/sched_thread2/main/sched.c
URL: http://svn.digium.com/svn-view/asterisk/team/russell/sched_thread2/main/sched.c?view=diff&rev=170835&r1=170834&r2=170835
==============================================================================
--- team/russell/sched_thread2/main/sched.c (original)
+++ team/russell/sched_thread2/main/sched.c Fri Jan 23 23:29:34 2009
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2008, Digium, Inc.
  *
  * Mark Spencer <markster at digium.com>
  *
@@ -70,6 +70,155 @@
 #endif
 };
 
+struct ast_sched_thread {
+	pthread_t thread;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	struct sched_context *context;
+	unsigned int stop:1;
+};
+
+static void *sched_run(void *data)
+{
+	struct ast_sched_thread *st = data;
+
+	while (!st->stop) {
+		int ms;
+		struct timespec ts = {
+			.tv_sec = 0,	
+		};
+
+		ast_mutex_lock(&st->lock);
+
+		if (st->stop) {
+			ast_mutex_unlock(&st->lock);
+			return NULL;
+		}
+
+		ms = ast_sched_wait(st->context);
+
+		if (ms == -1) {
+			ast_cond_wait(&st->cond, &st->lock);
+		} else {	
+			struct timeval tv;
+			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
+			ts.tv_sec = tv.tv_sec;
+			ts.tv_nsec = tv.tv_usec * 1000;
+			ast_cond_timedwait(&st->cond, &st->lock, &ts);
+		}
+
+		ast_mutex_unlock(&st->lock);
+
+		if (st->stop) {
+			return NULL;
+		}
+
+		ast_sched_runq(st->context);
+	}
+
+	return NULL;
+}
+
+void ast_sched_thread_poke(struct ast_sched_thread *st)
+{
+	ast_mutex_lock(&st->lock);
+	ast_cond_signal(&st->cond);
+	ast_mutex_unlock(&st->lock);
+}
+
+long ast_sched_thread_when(struct ast_sched_thread *st, int id)
+{
+	return ast_sched_when(st->context, id);
+}
+
+struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
+{
+	return st->context;
+}
+
+struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
+{
+	if (st->thread != AST_PTHREADT_NULL) {
+		ast_mutex_lock(&st->lock);
+		st->stop = 1;
+		ast_cond_signal(&st->cond);
+		ast_mutex_unlock(&st->lock);
+		pthread_join(st->thread, NULL);
+		st->thread = AST_PTHREADT_NULL;
+	}
+
+	ast_mutex_destroy(&st->lock);
+	ast_cond_destroy(&st->cond);
+
+	if (st->context) {
+		sched_context_destroy(st->context);
+		st->context = NULL;
+	}
+
+	ast_free(st);
+
+	return NULL;
+}
+
+struct ast_sched_thread *ast_sched_thread_create(void)
+{
+	struct ast_sched_thread *st;
+
+	if (!(st = ast_calloc(1, sizeof(*st)))) {
+		return NULL;
+	}
+
+	ast_mutex_init(&st->lock);
+	ast_cond_init(&st->cond, NULL);
+
+	st->thread = AST_PTHREADT_NULL;
+
+	if (!(st->context = sched_context_create())) {
+		ast_log(LOG_ERROR, "Failed to create scheduler\n");
+		ast_sched_thread_destroy(st);
+		return NULL;
+	}
+	
+	if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
+		ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
+		ast_sched_thread_destroy(st);
+		return NULL;
+	}
+
+	return st;
+}
+
+int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+		const void *data, int variable)
+{
+	int res;
+
+	res = ast_sched_add_variable(st->context, when, cb, data, variable);
+
+	if (res != -1) {
+		ast_mutex_lock(&st->lock);
+		ast_cond_signal(&st->cond);
+		ast_mutex_unlock(&st->lock);
+	}
+
+	return res;
+}
+
+int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+		const void *data)
+{
+	int res;
+
+	res = ast_sched_add(st->context, when, cb, data);
+
+	if (res != -1) {
+		ast_mutex_lock(&st->lock);
+		ast_cond_signal(&st->cond);
+		ast_mutex_unlock(&st->lock);
+	}
+
+	return res;
+}
 
 /* hash routines for sched */
 




More information about the asterisk-commits mailing list