[Asterisk-Dev] RFC: New Jitter Buffer

Steve Kann stevek at stevek.com
Tue Nov 23 08:09:08 MST 2004



Hi List,

    As I talked about a few weeks ago on this list, and with others 
since astricon, I've been planning on writing a modular jitterbuffer 
(called "playout buffer" in most papers).  The basic idea is to have a 
module that can be used for all VoIP channels in asterisk, as well as 
with iaxclient and libiax2.   Depending on where it goes inside 
asterisk, this could mean putting it in chan_iax2.c [replacing the 
present implementation], and rtp.c, or it could actually go somewhere 
else like channel.c.

    There's lots of details of the design goals at
http://www.voip-info.org/tiki-index.php?page=Asterisk+new+jitterbuffer

The highlighted features are:

1) Packet Loss Concealment:  It can tell callers when to interpolate a 
lost frame, to conceal either     lost frames, or jitterbuffer growth.  
Tested with speex so far, this works really well.

2) An efficient queuing mechanism:  It does handle the packet queue 
internally, but uses a circular list, so that in the general case 
(without reordered packets), you always add a frame to one end, and take 
it off the other end, so it scales to any jittebuffer length with no 
extra overhead.

3) smooth growth and shrinking:  Since the present chan_iax2 and libiax2 
jitterbuffer mechanism only has control over scheduling delivery times, 
it cannot smoothly shrink packets already queued.  It also cannot shrink 
quickly, because it may then cause misordering.  It also cannot spread 
growth out amongst several frames in the queue.  This buffer keeps the 
frames internally, and it's scheduling is done mainly at the "output" side.

3) Statistics:  These can be used to display locally, and send to the 
other side of the conversation.  When transmitted, both sides will know 
the parameters of communications in both directions, and can display 
even better metrics.  Also, at some point, these could be used to change 
the way communication happens (for example, try using FEC if it seems we 
have high loss not related to bandwidth, etc).

4) It should be pretty easy to tie into wherever it needs to go in 
asterisk.  It is pretty easily tied into chan_iax2 and iaxclient.    
Although the basic design is to "poll" the buffer for frames to come 
out, it can always tell you when it thinks the next frame should be 
coming out.  So, you can set up a ast_sched task to get the next frame 
at the correct time, and adjust that task whenever you add/remove a 
frame, so it should always be correct.

Now, on to the present implementation:

What I have here is rather basic, and not quite complete, but I'm 
looking for comments on it's design first, and then details second.
This code, as applied to iaxclient-cvs, yields a working example.  (I 
can linux binaries if you're interested).


It's certainly not perfect, but I've got PLC going with it, it's 
modular, and it was pretty easy to hook up to libiax2 (and would go into 
chan_iax2 pretty much the same way).

I have hacked up a version of testcall that you can use to simulate some 
adverse network conditions:

keys 1-5 <plus enter>: set packet loss rate to 0,5,10,15,20 percent.
keys 6-9 <enter>: set jitter to 0,250,500,750,1000 ms (IIRC).

The jitter creating mechanism isn't the best, but it was really just a 
quick hack.

PLC presently only works with codec_speex, but from the diff, you'll see 
it will be pretty easy to add to other codecs, although we'll need to 
write it ourselves for GSM and G711 friends.

If you look at the patch, it also includes an aborted integration into 
iaxclient_lib.c;  I decided instead to work with libiax2/src/iax.c, 
because that's the most similar to chan_iax2.c, making a better 
proof-of-concept for integration into asterisk.

Also, in the jitterbuf.c/jitterbuf.h there's big sections marked #ifdef 
SPEEX.  These are an alternate implementation based on Jean-Marc Valin's 
jitterbuffer in libspeex.  The present implementation is a bit more like 
the present IAX jitterbuffers.

So, without further ado, the patch is attached.


-------------- next part --------------
Index: Makefile
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/Makefile,v
retrieving revision 1.34
diff -u -w -r1.34 Makefile
--- Makefile	15 Nov 2004 23:26:54 -0000	1.34
+++ Makefile	23 Nov 2004 00:42:37 -0000
@@ -9,7 +9,7 @@
 SPEEX_EC=1
 MEC2_EC=0
 
-CFLAGS=-Igsm/inc \
+CFLAGS= -DNEWJB -I. -Igsm/inc \
 	-Iportaudio/pa_common -Iportaudio/pablio -Iportmixer/px_common \
 	-Ilibspeex/include
 
@@ -23,7 +23,7 @@
 
 # debug flags
 # CFLAGS := $(CFLAGS) -DDEBUG_SUPPORT -DDEBUG_DEFAULT -DEXTREME_DEBUG -D_DEBUG 
-# CFLAGS := $(CFLAGS) -DDEBUG_SUPPORT -DDEBUG_DEFAULT -D_DEBUG 
+CFLAGS := $(CFLAGS) -DDEBUG_SUPPORT -DDEBUG_DEFAULT -D_DEBUG 
 
 OBJS=\
 	gsm/src/add.o \
@@ -54,6 +54,7 @@
 	codec_ulaw.o \
 	codec_alaw.o \
 	codec_speex.o \
+	jitterbuf.o \
 	iaxclient_lib.o 
 
 OBJS_LIBSPEEX=\
Index: audio_encode.c
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/audio_encode.c,v
retrieving revision 1.30
diff -u -w -r1.30 audio_encode.c
--- audio_encode.c	16 Nov 2004 18:59:00 -0000	1.30
+++ audio_encode.c	23 Nov 2004 00:42:37 -0000
@@ -275,10 +275,7 @@
 	/* update last output timestamp */
 	gettimeofday( &timeLastOutput, NULL ) ;
 
-	if(len == 0) {
-		fprintf(stderr, "Empty voice frame\n");
-		return -1;
-	}
+	//if(len == 0) fprintf(stderr, "Interpolation voice frame\n");
 
 	if(format == 0) {
 		fprintf(stderr, "decode_audio: Format is zero (should't happen)!\n");
Index: codec_speex.c
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/codec_speex.c,v
retrieving revision 1.3
diff -u -w -r1.3 codec_speex.c
--- codec_speex.c	16 Nov 2004 18:59:00 -0000	1.3
+++ codec_speex.c	23 Nov 2004 00:42:37 -0000
@@ -41,6 +41,14 @@
 
     struct state * decstate = (struct state *) c->decstate;
 
+    if(*inlen == 0) {
+	//return 0;
+	//fprintf(stderr, "Speex Interpolate\n");
+	speex_decode_int(decstate->state, NULL, out);
+	*outlen -= 160;
+	return 0;
+    }
+
     /* XXX if the input contains more than we can read, we lose here */
     speex_bits_read_from(&decstate->bits, in, *inlen);
     *inlen = 0; 
Index: codec_ulaw.c
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/codec_ulaw.c,v
retrieving revision 1.2
diff -u -w -r1.2 codec_ulaw.c
--- codec_ulaw.c	14 Oct 2004 19:21:55 -0000	1.2
+++ codec_ulaw.c	23 Nov 2004 00:42:37 -0000
@@ -85,12 +85,15 @@
 static int decode ( struct iaxc_audio_codec *c, 
     int *inlen, char *in, int *outlen, short *out ) {
 
+    if(*inlen == 0) {
+	fprintf(stderr, "uLaw interpolate: TODO\n");
+    }
+
     while ((*inlen > 0) && (*outlen > 0)) {
 	*(out++) = ulaw_2lin[(unsigned char)*(in++)];
 	(*inlen)--; (*outlen)--;
     }
 
-
     return 0;
 }
 
Index: iaxclient.h
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/iaxclient.h,v
retrieving revision 1.35
diff -u -w -r1.35 iaxclient.h
--- iaxclient.h	16 Nov 2004 18:59:00 -0000	1.35
+++ iaxclient.h	23 Nov 2004 00:42:37 -0000
@@ -16,8 +16,23 @@
 #include <stdio.h>
 #ifdef WIN32
 #include <windows.h>
+#include <winsock.h>
+#else
+#include <sys/socket.h>
 #endif
 
+#ifdef WIN32
+#if defined(_MSC_VER)
+	typedef int (__stdcall *iaxc_sendto_t)(SOCKET, const char *, int, int, const struct sockaddr *, int);
+#else
+	typedef int PASCAL (*iaxc_sendto_t)(SOCKET, const char *, int, int, const struct sockaddr *, int);
+#endif
+#else
+	typedef int (*iaxc_sendto_t)(int, const void *, size_t, int, const struct sockaddr *, socklen_t);
+	typedef int (*iaxc_recvfrom_t)(int s, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen);
+#endif
+
+
 /* Define audio type constants */
 #define AUDIO_INTERNAL 0
 #define AUDIO_INTERNAL_PA 1
@@ -128,6 +143,11 @@
 int iaxc_mic_boost_get( void ) ;
 int iaxc_mic_boost_set( int enable ) ;
 
+/* application-defined networking; give substiture sendto and recvfrom functions,
+ * must be called before iaxc_initialize! */
+void iaxc_set_networking(iaxc_sendto_t st, iaxc_recvfrom_t rf) ;
+
+
 #define IAXC_AD_INPUT           (1<<0)
 #define IAXC_AD_OUTPUT          (1<<1)
 #define IAXC_AD_RING            (1<<2)
Index: iaxclient_lib.c
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/iaxclient_lib.c,v
retrieving revision 1.65
diff -u -w -r1.65 iaxclient_lib.c
--- iaxclient_lib.c	15 Nov 2004 20:06:50 -0000	1.65
+++ iaxclient_lib.c	23 Nov 2004 00:42:37 -0000
@@ -9,6 +9,8 @@
 #define DEFAULT_CALLERID_NAME    "Not Available"
 #define DEFAULT_CALLERID_NUMBER  "7005551212"
 
+#undef NEWJB
+
 struct iaxc_registration {
     struct iax_session *session;
     int firstpass;
@@ -48,6 +50,11 @@
 static void do_iax_event();
 static int service_audio();
 
+/* external global networking replacements */
+static iaxc_sendto_t   iaxc_sendto = sendto;
+static iaxc_recvfrom_t iaxc_recvfrom = recvfrom;
+
+
 static THREAD procThread;
 #ifdef WIN32
 static THREADID procThreadID;
@@ -185,6 +192,10 @@
       // XXX libiax should handle cleanup, I think..
       calls[toDump].state = IAXC_CALL_STATE_FREE;
       calls[toDump].session = NULL;
+#ifdef NEWJB
+      /* NEWJB: clear out frames! */
+      jb_destroy(calls[toDump].jb);
+#endif
       iaxc_do_state_callback(toDump);
 }
 
@@ -233,6 +244,11 @@
 	return selected_call;
 }
 
+void iaxc_set_networking(iaxc_sendto_t st, iaxc_recvfrom_t rf) {
+    iaxc_sendto = st;
+    iaxc_recvfrom = rf;
+}
+
 // Parameters:
 // audType - Define whether audio is handled by library or externally
 int iaxc_initialize(int audType, int inCalls) {
@@ -244,11 +260,15 @@
 
 	MUTEXINIT(&iaxc_lock);
 
+	if(iaxc_sendto == sendto) {
 	if ( (port = iax_init(0) < 0)) {
 		iaxc_usermsg(IAXC_ERROR, "Fatal error: failed to initialize iax with port %d", port);
 		return -1;
 	}
 	netfd = iax_get_fd();
+	} else {
+	    iax_set_networking(iaxc_sendto, iaxc_recvfrom);
+	}
 
 	nCalls = inCalls;
 	/* initialize calls */
@@ -606,28 +626,21 @@
 
 	samples = bufsize;
 
-#ifdef IAXC_IAX2
-	while(total_consumed < e->datalen) {
+	do {
 		cur = decode_audio(call, fr + (bufsize - samples),
 		    e->data+total_consumed,e->datalen-total_consumed,
 		    call->format, &samples);
-#else
-	while(total_consumed < e->event.voice.datalen) {
-		cur = decode_audio(call, fr,
-		    e->event.voice.data+total_consumed,e->event.voice.datalen-total_consumed,
-		    call.format, &samples);
-#endif
+
 		if(cur < 0) {
 			iaxc_usermsg(IAXC_STATUS, "Bad or incomplete voice packet.  Unable to decode. dropping");
 			return;
-		} else {  /* its an audio packet to be output to user */
-			total_consumed += cur;
-
-			if(iaxc_audio_output_mode != 0) continue;
+		}
 
+		total_consumed += cur;
+		if(iaxc_audio_output_mode != 0) 
+		    continue;
 			audio.output(&audio,fr,bufsize-samples);
-		}
-	}
+	} while(total_consumed < e->datalen);
 }
 
 
@@ -668,7 +681,12 @@
 			// notify the user?
  			break;
 		case IAX_EVENT_VOICE:
+#ifdef NEWJB
+			/* NEWJB: timestamp! */
+			jb_put(calls[callNo].jb,e,160,e->ts,1000000);
+#else
 			handle_audio_event(e, callNo);
+#endif
 			break;
 		case IAX_EVENT_TEXT:
 			handle_text_event(e, callNo);
@@ -755,6 +773,10 @@
 	calls[callNo].encoder = 0;
 	calls[callNo].decoder = 0;
 
+#ifdef NEWJB
+	calls[callNo].jb = jb_new();
+#endif
+
 	if(ext) {
 	    strncpy(calls[callNo].remote_name, num, IAXC_EVENT_BUFSIZ); 
     	    strncpy(calls[callNo].remote,    ++ext, IAXC_EVENT_BUFSIZ);
@@ -1065,6 +1087,10 @@
 			calls[callNo].session = e->session;
 			calls[callNo].state = IAXC_CALL_STATE_ACTIVE|IAXC_CALL_STATE_RINGING;
 
+#ifdef NEWJB
+			calls[callNo].jb = jb_new();
+#endif
+
 
 			iax_accept(calls[callNo].session,format);
 			iax_ring_announce(calls[callNo].session);
Index: iaxclient_lib.h
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/iaxclient_lib.h,v
retrieving revision 1.37
diff -u -w -r1.37 iaxclient_lib.h
--- iaxclient_lib.h	8 Nov 2004 17:56:32 -0000	1.37
+++ iaxclient_lib.h	23 Nov 2004 00:42:37 -0000
@@ -26,6 +26,8 @@
 #include "speex/speex_preprocess.h"
 #endif
 
+#include "jitterbuf.h"
+
 #include <stdio.h>
 
 
@@ -163,6 +165,11 @@
 	struct 	 timeval 	last_activity;
 	struct 	 timeval 	last_ping;
 
+#ifdef NEWJB
+	/* jitterbuffer */
+	jitterbuf *jb;
+#endif
+
 	/* our negotiated format */
 	int format;
 
Index: jitterbuf.c
===================================================================
RCS file: jitterbuf.c
diff -N jitterbuf.c
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ jitterbuf.c	23 Nov 2004 00:42:37 -0000
@@ -0,0 +1,582 @@
+/*
+ * jitterbuf: an application-independent jitterbuffer
+ *
+ * Copyrights:
+ * Copyright (C) 2004, Horizon Wimba, Inc.
+ *
+ * Contributors:
+ * Steve Kann <stevek at stevek.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU Lesser (Library) General Public License
+ */
+
+#include "jitterbuf.h"
+#include "stdio.h"
+#include "stdlib.h"
+#include "string.h"
+
+#define jb_warn(...) fprintf(stderr, __VA_ARGS__)
+#define jb_err(...)  fprintf(stderr, __VA_ARGS__)
+//#define jb_dbg(...)  fprintf(stderr, __VA_ARGS__)
+#define jb_dbg(...)  
+
+#ifdef SPEEX
+#define LATE_BINS 4
+#endif
+
+jitterbuf * jb_new() {
+    jitterbuf *jb;
+
+
+    jb = malloc(sizeof(jitterbuf));
+    if(!jb) return NULL;
+
+
+    memset(jb,0,sizeof(jitterbuf));
+
+
+#ifdef SPEEX
+    /* XXX - fixme */
+    jb->frame_time = 20;
+    jb->buffer_size = 4;
+    jb->pointer_timestamp = -jb->frame_time * jb->buffer_size;
+#endif
+
+    /* initialize length */
+    jb->info.length = 100; 
+    jb->info.silence = 1; 
+
+    jb_dbg("jb_new() = %x\n", jb);
+    return jb;
+}
+
+void jb_destroy(jitterbuf *jb) {
+    jb_dbg("jb_destroy(%x)\n", jb);
+    jb_err("FIXME: destroy\n");
+}
+
+/* simple history manipulation */
+/* maybe later we can make the history buckets variable size, or something? */
+static inline long maxdiff(long *arr, int n) {
+    long max = 0;
+    long min = 0;
+    while(n-- > 0) {
+	if(arr[n] > max) max = arr[n];
+	else if(arr[n] < min) min = arr[n];
+    }
+    return max-min;
+}
+
+static void history_put(jitterbuf *jb, long ts, long now) {
+
+    /* don't add special/negative times to history */
+    if(ts <= 0) return;
+
+    /* rotate long-term history as needed */
+    while( (now - jb->hist_ts) > 1000) 
+    {
+	jb->hist_ts += 1000;
+
+	/* rotate long-term history */
+	memmove(&(jb->hist_long[1]), &(jb->hist_long[0]), (JB_HISTORY_SECONDS-1) * sizeof(jb->hist_long[0]));
+
+	/* move current short-term history to long-term */
+	jb->hist_long[0] = maxdiff(jb->hist_short, jb->hist_shortcur);
+	jb_dbg("history: rotating, short-term was %ld\n", jb->hist_long[0]);
+	jb_warninfo(jb);
+	//jb_warnqueue(jb);
+
+	/* clear short-term */
+	jb->hist_shortcur = 0;
+    }
+
+    /* add this entry, if it's in the window */
+    if(jb->hist_shortcur < JB_HISTORY_MAXPERSEC)
+	jb->hist_short[jb->hist_shortcur++] = now - ts;
+
+}
+
+static long history_get(jitterbuf *jb) {
+    long lhist;; 
+    long shist;
+    long hist;
+
+    shist = maxdiff(jb->hist_short, jb->hist_shortcur);
+    lhist = maxdiff(jb->hist_long, JB_HISTORY_SECONDS);
+  
+    if(shist > lhist) 
+      hist = shist;
+    else 
+      hist = lhist;
+
+
+    jb_dbg("history: long=%ld short=%ld total=%ld\n", lhist, shist, hist);
+
+    return hist;
+}
+
+static void queue_put(jitterbuf *jb, void *data, int type, long ms, long ts) {
+    jb_frame *frame;
+    jb_frame *p;
+
+    frame = jb->free;
+    if(frame) {
+	jb->free = frame->next;
+    } else {
+	frame = malloc(sizeof(jb_frame));
+    }
+
+    if(!frame) {
+	jb_err("cannot allocate frame\n");
+	return;
+    }
+
+    jb->info.frames_cur++;
+
+    frame->data = data;
+    frame->ts = ts;
+    frame->ms = ms;
+    frame->type = type;
+
+    /* 
+     * frames are a circular list, jb-frames points to to the lowest ts, 
+     * jb->frames->prev points to the highest ts
+     */
+
+    if(!jb->frames) {  /* queue is empty */
+	jb->frames = frame;
+	frame->next = frame;
+	frame->prev = frame;
+    } else if(ts < jb->frames->ts) {
+	frame->next = jb->frames;
+	frame->prev = jb->frames->prev;
+
+	frame->next->prev = frame;
+	frame->prev->next = frame;
+
+	jb->frames = frame;
+    } else {
+	p = jb->frames;
+
+	while(ts < p->prev->ts && p->prev != jb->frames) 
+	    p = p->prev;
+
+	frame->next = p;
+	frame->prev = p->prev;
+
+	frame->next->prev = frame;
+	frame->prev->next = frame;
+    }
+}
+
+jb_frame *queue_get(jitterbuf *jb, long ts) {
+    jb_frame *frame;
+    frame = jb->frames;
+
+    if(!frame)
+	return NULL;
+
+    //jb_warn("queue_get: ASK %ld FIRST %ld\n", ts, frame->ts);
+
+    if(ts > frame->ts) {
+	/* remove this frame */
+	frame->prev->next = frame->next;
+	frame->next->prev = frame->prev;
+
+	if(frame->next == frame)
+	  jb->frames = NULL;
+	else
+	  jb->frames = frame->next;
+
+
+	/* insert onto "free" single-linked list */
+	frame->next = jb->free;
+	jb->free = frame;
+
+	jb->info.frames_cur--;
+
+	/* we return the frame pointer, even though it's on free list, 
+	 * but caller must copy data */
+	return frame;
+    } 
+
+    return NULL;
+}
+
+/* some diagnostics */
+static void jb_dbginfo(jitterbuf *jb) {
+    jb_dbg("jb info: fin=%ld fout=%ld flate=%ld flost=%ld fcur=%ld jitter=%ld length=%ld drift=%ld silence=%d\n",
+	    jb->info.frames_in, jb->info.frames_out, jb->info.frames_late, jb->info.frames_lost,
+	    jb->info.frames_cur, jb->info.jitter, jb->info.length, jb->info.drift, jb->info.silence);
+}
+
+static void jb_warninfo(jitterbuf *jb) {
+    jb_warn("jb info: fin=%ld fout=%ld flate=%ld flost=%ld fcur=%ld jitter=%ld length=%ld drift=%ld silence=%d\n",
+	    jb->info.frames_in, jb->info.frames_out, jb->info.frames_late, jb->info.frames_lost,
+	    jb->info.frames_cur, jb->info.jitter, jb->info.length, jb->info.drift, jb->info.silence);
+    if(jb->info.frames_in > 0) 
+	jb_warn("jb info: Loss PCT = %ld%%, Late PCT = %ld%%\n",
+	    jb->info.frames_lost * 100/jb->info.frames_in, 
+	    jb->info.frames_late * 100/jb->info.frames_in);
+}
+
+static void jb_chkqueue(jitterbuf *jb) {
+    int i=0;
+    jb_frame *p = jb->frames;
+
+    if(!p) {
+      return;
+    }
+
+    do {
+	if(p->next == NULL)  {
+	  jb_err("Queue is BROKEN at item [%d]", i);	
+	}
+	i++;
+	p=p->next;
+    } while (p->next != jb->frames);
+}
+
+static void jb_warnqueue(jitterbuf *jb) {
+    int i=0;
+    jb_frame *p = jb->frames;
+
+    jb_warn("queue: ");
+
+    if(!p) {
+      jb_warn("EMPTY\n");
+      return;
+    }
+
+    do {
+	jb_warn("[%d]=%ld ", i++, p->ts);
+	p=p->next;
+    } while (p->next != jb->frames);
+
+    jb_warn("\n");
+
+
+}
+
+static void jb_adjust(jitterbuf *jb, int now) {
+
+    long diff;
+    long adjustment;
+
+    jb->info.jitter = history_get(jb);
+    
+    diff = (jb->info.jitter) - jb->info.length;
+
+    if(jb->info.silence) {
+	if(diff > 0)
+	{
+	    /* just grow as we want, we'll only be expanding the silence a bit */
+	    adjustment = diff;
+	} else { /* we can shrink quickly here, as long as we have silence in the queue */
+	    /* find out the gap (negative) between what we last sent, and the next frame in the queue
+	     * it might not be voice, but then we'll adjust again later */
+	    adjustment = (jb->info.last_ts ) - jb_next(jb); 
+
+	    /* of course, clamp our adjustment at our goal */
+	    if(adjustment < diff) adjustment = diff;
+	}
+    } else {
+	if(diff < 0) {
+	    /* shrink at about a 1 in 10 rate.  We'll drop 10% of frames, which is pretty unnoticable,
+	     * but we get the latency down quickly */
+	    adjustment = (jb->info.last_adjustment - now)/10;
+	    if(adjustment < diff) adjustment = diff;
+	} else {
+	    /* adjust at a rate such that we get the full desired adjustment before the end of the buffer */
+	    /* XXX: Is this a reasonable way to grow?? */
+	    adjustment = diff * (now - jb->info.last_adjustment) / (jb->info.length + now - jb->info.last_adjustment);	
+	}
+    }
+
+    jb->info.length += adjustment;
+    jb->info.last_adjustment = now;
+}
+
+int jb_put(jitterbuf *jb, void *data, int type, long ms, long ts, long now) {
+
+    jb_dbg("jb_put(%x,%x,%ld,%ld,%ld)\n", jb, data, ms, ts, now);
+
+    jb->info.frames_in++;
+
+#ifdef SPEEX
+    {
+	int arrival_margin;
+	/* reset state */
+        /* cleanup old (unnecessary?) */
+	/*Find an empty slot in the buffer*/
+
+	/* Copy packet in buffer */
+	queue_put(jb,data,type,ms,ts);
+
+	/* Adjust the buffer size depending on network conditions */
+	arrival_margin = (ts - jb->pointer_timestamp - jb->frame_time);
+	if (arrival_margin >= -LATE_BINS*jb->frame_time)
+	{
+	    int int_margin;
+	    int i;
+	    for (i=0;i<MAX_MARGIN;i++)
+	    {
+	       jb->shortterm_margin[i] *= .98;
+	       jb->longterm_margin[i] *= .995;
+	    }
+	    int_margin = (arrival_margin + LATE_BINS*jb->frame_time)/jb->frame_time;
+	    if (int_margin>MAX_MARGIN-1)
+	       int_margin = MAX_MARGIN-1;
+	    if (int_margin>=0)
+	    {
+	       jb->shortterm_margin[int_margin] += .02;
+	       jb->longterm_margin[int_margin] += .005;
+	    }
+	}
+    }
+#else
+
+    
+    if(type == JB_TYPE_VOICE) {
+      /* presently, I'm only adding VOICE frames to history and drift calculations; mostly because with the
+       * IAX integrations, I'm sending retransmitted control frames with their awkward timestamps through */
+      history_put(jb,ts,now);
+
+      /* calculate clock drift */
+      jb->info.drift = (int) ((999.0 * jb->info.drift + ts - now)/1000.0);
+    }
+
+    /* adjust if we haven't adjusted for 200ms */
+    if(now - jb->info.last_adjustment > 200) jb_adjust(jb, now);
+
+    /* if it is voice (ms != 0) and too late, drop */
+    /* XXX check on this calculation */
+    if((type == JB_TYPE_VOICE)  && ts < (jb->info.last_ts - jb->info.length + jb->info.drift)) {
+
+	/* if we didn't just adjust, do so now, because we want to grow quickly */
+	if(now > jb->info.last_adjustment) jb_adjust(jb, now);
+
+	jb->info.frames_late++;
+	jb->info.frames_lost--;
+	return JB_OK;
+    }
+
+
+    queue_put(jb,data,type,ms,ts);
+
+    jb_dbginfo(jb);
+#endif
+
+    return JB_OK;
+}
+
+/* this is the adjustment to be applied to the next outgoing frame timestamp */
+static long jb_getadjustment(jitterbuf *jb) { return (jb->info.length - jb->info.drift); }
+
+int jb_get(jitterbuf *jb, jb_frame *frameout, long now) {
+    jb_frame *frame;
+
+    jb_dbg("jb_get(%x,%x,%ld)\n", jb, frameout, now);
+
+#ifdef SPEEX
+    {
+	long *current_timestamp = NULL;  /* XXX: FIXME */
+	int i;
+	int ret;
+	float late_ratio_short;
+	float late_ratio_long;
+	float ontime_ratio_short;
+	float ontime_ratio_long;
+	float early_ratio_short;
+	float early_ratio_long;
+
+	late_ratio_short = 0;
+	late_ratio_long = 0;
+	for (i=0;i<LATE_BINS;i++)
+	{
+	  late_ratio_short += jb->shortterm_margin[i];
+	  late_ratio_long += jb->longterm_margin[i];
+	}
+	ontime_ratio_short = jb->shortterm_margin[LATE_BINS];
+	ontime_ratio_long = jb->longterm_margin[LATE_BINS];
+	early_ratio_short = early_ratio_long = 0;
+	for (i=LATE_BINS+1;i<MAX_MARGIN;i++)
+	{
+	  early_ratio_short += jb->shortterm_margin[i];
+	  early_ratio_long += jb->longterm_margin[i];
+	}
+	if (0&&jb->pointer_timestamp%1000==0)
+	{
+	  jb_dbg("%f %f %f %f %f %f\n", early_ratio_short, early_ratio_long, ontime_ratio_short, ontime_ratio_long, late_ratio_short, late_ratio_long);
+	  /*fprintf (stderr, "%f %f\n", early_ratio_short + ontime_ratio_short + late_ratio_short, early_ratio_long + ontime_ratio_long + late_ratio_long);*/
+	}
+	if (late_ratio_short > .1 || late_ratio_long > .03)
+	{
+	  jb->shortterm_margin[MAX_MARGIN-1] += jb->shortterm_margin[MAX_MARGIN-2];
+	  jb->longterm_margin[MAX_MARGIN-1] += jb->longterm_margin[MAX_MARGIN-2];
+	  for (i=MAX_MARGIN-2;i>=0;i--)
+	  {
+	     jb->shortterm_margin[i+1] = jb->shortterm_margin[i];
+	     jb->longterm_margin[i+1] = jb->longterm_margin[i];
+	  }
+	  jb->shortterm_margin[0] = 0;
+	  jb->longterm_margin[0] = 0;
+	  /*fprintf (stderr, "interpolate frame\n");*/
+	  /* speex_decode_int(jb->dec, NULL, out); */
+	  if (current_timestamp)
+	     *current_timestamp = jb->pointer_timestamp;
+	  /* STATS: we're interpolating because we're growing */
+	  jb_warn("get: GROWING\n");
+	  return JB_INTERP;
+	}
+
+	/* Increment timestamp */
+	jb->pointer_timestamp += jb->frame_time;
+
+	if (late_ratio_short + ontime_ratio_short < .005 && late_ratio_long + ontime_ratio_long < .01 && early_ratio_short > .8)
+	{
+	  jb->shortterm_margin[0] += jb->shortterm_margin[1];
+	  jb->longterm_margin[0] += jb->longterm_margin[1];
+	  for (i=1;i<MAX_MARGIN-1;i++)
+	  {
+	     jb->shortterm_margin[i] = jb->shortterm_margin[i+1];
+	     jb->longterm_margin[i] = jb->longterm_margin[i+1];
+	  }
+	  jb->shortterm_margin[MAX_MARGIN-1] = 0;
+	  jb->longterm_margin[MAX_MARGIN-1] = 0;
+	  /*fprintf (stderr, "drop frame\n");*/
+
+    	  frame = queue_get(jb, jb->pointer_timestamp);
+
+	  jb->pointer_timestamp += jb->frame_time;
+
+	  if(!frame) {
+		jb_warn("get: SHRINKING & LOST\n");
+		return JB_NOFRAME; /* we lost it, but wanted to drop anyhow. How convenient */
+	  }
+
+	  *frameout = *frame;
+
+	  jb_warn("get: SHRINKING\n");
+	  return JB_DROP;
+	  
+	  /* STATS: We're dropping because we're shrinking.. */
+	}
+
+	if (current_timestamp)
+	  *current_timestamp = jb->pointer_timestamp;
+
+	/* Send zeros while we fill in the buffer */
+	if (jb->pointer_timestamp<0)
+	{
+	  jb_warn("get: FILLING\n");
+	  return JB_NOFRAME;
+	}
+
+	/* Search the buffer for a packet with the right timestamp */
+	frame = queue_get(jb,jb->pointer_timestamp);
+
+	if (!frame)
+	{
+	  /* TODO: we lose a bit of semantics here with lost_count and residual speex-bits 
+		and the reset after 25 lost.. */
+	  jb->info.frames_lost++;
+	  jb->loss_rate = .999*jb->loss_rate + .001;
+
+	  jb_warn("get: LOST\n");
+	  return JB_INTERP;
+	}
+
+	*frameout = *frame;
+	jb->loss_rate = .999*jb->loss_rate;
+	jb_warn("get: OK\n");
+	return JB_OK;
+
+    }
+#else
+    if(jb->info.frames_cur == 0)
+	return JB_NOFRAME;
+
+    jb->info.last_ts = now;
+
+    /* Adjust "now" to be some time in the past; the "length" of the jitterbuffer */
+    /* also, compensate for clock drift */
+    now -= jb_getadjustment(jb);
+
+    frame = queue_get(jb, now);
+
+    if(!frame) {
+	if(!jb->info.silence && (now > jb->info.next_voice)) {
+	    jb->info.frames_lost++;
+	    jb->info.next_voice += jb->info.last_voice_ms; 
+	    jb_warn("interp for ts=%ld\n", now);
+	    return JB_INTERP;
+	}
+	return JB_NOFRAME;
+    }
+
+    /* copy info -- should theoretically clear pointers, but doesn't matter */
+    *frameout=*frame;
+
+    /* move forward next voice expectation */
+    if(frame->type == JB_TYPE_VOICE) {
+	if(jb->info.silence) {
+	  jb->info.next_voice = now ;
+	  jb->info.silence = 0;
+	}
+	jb->info.next_voice += frame->ms;
+	jb->info.last_voice_ms = frame->ms;
+    }
+
+    if(frame->type == JB_TYPE_SILENCE)
+      jb->info.silence = 1;
+
+    jb_dbg("got frames for ts=%ld\n", now);
+    jb->info.frames_out++;
+    return JB_OK;
+#endif /* SPEEX */
+}
+
+int jb_getall(jitterbuf *jb, jb_frame *frameout) {
+    jb_frame *frame;
+    frame = queue_get(jb, 0);
+
+    if(!frame) {
+      return JB_NOFRAME;
+    }
+
+    *frameout = *frame;
+    return JB_OK;
+}
+
+long jb_next(jitterbuf *jb) {
+#ifdef SPEEX
+    if(jb->frames && jb->frames->ts < jb->pointer_timestamp)
+	return jb->frames->ts;
+    return jb->pointer_timestamp;
+#else
+    if(jb->frames) /* adjusted to caller's time - see jb_get */
+    {
+       long next = jb->frames->ts + jb_getadjustment(jb);
+       if(!jb->info.silence && (jb->info.next_voice < next)) next = jb->info.next_voice;
+       return next;
+    }
+    else
+      return 0;
+#endif
+}
+
+int jb_getinfo(jitterbuf *jb, jb_info *stats) {
+
+    jb->info.jitter = history_get(jb);
+
+    *stats = jb->info;
+
+  return JB_OK;
+}
+
+int jb_setinfo(jitterbuf *jb, jb_info *settings) {
+  return JB_OK;
+}
+
Index: jitterbuf.h
===================================================================
RCS file: jitterbuf.h
diff -N jitterbuf.h
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ jitterbuf.h	23 Nov 2004 00:42:37 -0000
@@ -0,0 +1,127 @@
+/*
+ * jitterbuf: an application-independent jitterbuffer
+ *
+ * Copyrights:
+ * Copyright (C) 2004, Horizon Wimba, Inc.
+ *
+ * Contributors:
+ * Steve Kann <stevek at stevek.com>
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU Lesser (Library) General Public License
+ */
+
+#ifndef _JITTERBUF_H_
+#define _JITTERBUF_H_
+
+#define SPEEX
+#undef SPEEX
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* configuration constants */
+#define JB_HISTORY_SECONDS	10
+#define JB_HISTORY_MAXPERSEC	256
+
+/* return codes */
+#define JB_OK		0
+#define JB_EMPTY	1
+#define JB_NOFRAME	2
+#define JB_INTERP	3
+#define JB_DROP		4
+
+/* frame types */
+#define JB_TYPE_CONTROL	0
+#define JB_TYPE_VOICE	1
+#define JB_TYPE_VIDEO	2  /* reserved */
+#define JB_TYPE_SILENCE	3
+
+typedef struct jb_info {
+	/* statistics */
+	long frames_in;  	/* number of frames input to the jitterbuffer.*/
+	long frames_out;  	/* number of frames output from the jitterbuffer.*/
+	long frames_late; 	/* number of frames which were too late, and dropped.*/
+	long frames_lost; 	/* number of missing frames.*/
+	long frames_cur; 	/* number of frames presently in jb, awaiting delivery.*/
+	long jitter; 		/* jitter measured within current history interval*/
+	long length; 		/* the present jitterbuffer delay*/
+	long drift;		/* drift in ms between received frame clock, and local clock */
+	long last_ts;		/* the last ts that was read from the jb - in receiver's time */
+	long next_voice;	/* the next voice frame expected */
+	long last_voice_ms;	/* the duration of the last voice frame */
+	long silence;		/* we are presently playing out silence */
+	long last_adjustment;   /* the time of the last adjustment */
+} jb_info;
+
+typedef struct jb_frame {
+	void *data;		/* the frame data */
+	long ts;	/* the relative delivery time expected */
+	long ms;	/* the time covered by this frame, in sec/8000 */
+	int  type;	/* the type of frame */
+	struct jb_frame *next, *prev;
+} jb_frame;
+
+typedef struct jitterbuf {
+	jb_info info;
+
+	/* history */
+	long hist_long[JB_HISTORY_SECONDS];	/* history buckets */
+	long hist_short[JB_HISTORY_MAXPERSEC];   /* short-term history */
+	long hist_ts;				/* effective start time of last bucket */
+	int  hist_shortcur;			/* current index into short-term history */
+
+	jb_frame *frames; 		/* queued frames */
+	jb_frame *free; 		/* free frames (avoid malloc?) */
+
+#ifdef SPEEX
+#define MAX_MARGIN 12
+	int buffer_size;
+	int pointer_timestamp;
+	int reset_state;
+	int frame_time;
+
+	float shortterm_margin[MAX_MARGIN];
+	float longterm_margin[MAX_MARGIN];
+	float loss_rate;
+#endif
+} jitterbuf;
+
+
+/* new jitterbuf */
+jitterbuf *		jb_new();
+
+/* destroy jitterbuf */
+void			jb_destroy(jitterbuf *jb);
+
+/* queue a frame data=frame data, timings (in ms): ms=length of frame (for voice), ts=ts (sender's time) 
+ * now=now (in receiver's time)*/
+int 			jb_put(jitterbuf *jb, void *data, int type, long ms, long ts, long now);
+
+/* get a frame for time now (receiver's time)  return value is one of
+ * JB_OK:  You've got frame!
+ * JB_DROP: Here's an audio frame you should just drop.  Ask me again for this time..
+ * JB_NOFRAME: There's no frame scheduled for this time.
+ * JB_INTERP: Please interpolate an audio frame for this time (either we need to grow, or there was a lost frame 
+ * JB_EMPTY: The jb is empty.
+ */
+int			jb_get(jitterbuf *jb, jb_frame *frame, long now);
+
+/* when is the next frame due out, in receiver's time (0=EMPTY) 
+ * This value may change as frames are added (esp non-audio frames) */
+long			jb_next(jitterbuf *jb);
+
+/* get jitterbuf info: only "statistics" may be valid */
+int			jb_getinfo(jitterbuf *jb, jb_info *stats);
+
+/* set jitterbuf info: only "settings" may be honored */
+int			jb_setinfo(jitterbuf *jb, jb_info *settings);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif
Index: libiax2/src/iax-client.h
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/libiax2/src/iax-client.h,v
retrieving revision 1.12
diff -u -w -r1.12 iax-client.h
--- libiax2/src/iax-client.h	8 Nov 2004 17:56:32 -0000	1.12
+++ libiax2/src/iax-client.h	23 Nov 2004 00:42:37 -0000
@@ -94,6 +94,7 @@
 #endif
 #else
 typedef int (*sendto_t)(int, const void *, size_t, int, const struct sockaddr *, socklen_t);
+typedef int (*recvfrom_t)(int s, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen);
 #endif
 
 struct iax_event {
@@ -192,6 +193,11 @@
 void *iax_get_private(struct iax_session *s);
 void iax_set_sendto(struct iax_session *s, sendto_t sendto);
 
+/* to use application networking instead of internal, set call this instead of iax_init,
+ * and pass in sendto and recvfrom replacements.  blocking reads may not be implemented */
+void iax_set_networking(sendto_t st, recvfrom_t rf);
+
+
 /* Handle externally received frames */
 struct iax_event *iax_net_process(unsigned char *buf, int len, struct sockaddr_in *sin);
 
Index: libiax2/src/iax.c
===================================================================
RCS file: /cvsroot/iaxclient/iaxclient/lib/libiax2/src/iax.c,v
retrieving revision 1.29
diff -u -w -r1.29 iax.c
--- libiax2/src/iax.c	30 Oct 2004 01:51:52 -0000	1.29
+++ libiax2/src/iax.c	23 Nov 2004 00:42:37 -0000
@@ -24,6 +24,7 @@
 #include <io.h>
 #include <errno.h>
 
+
 #define	snprintf _snprintf
 
 #if defined(_MSC_VER)
@@ -59,6 +60,10 @@
 
 #endif
 
+#ifdef NEWJB
+#include "jitterbuf.h"
+#endif
+
 #include "iax-client.h"
 #include "md5.h"
 
@@ -131,10 +136,14 @@
 /* Dropcount (in per-MEMORY_SIZE) usually percent */
 static int iax_dropcount = 3;
 
+/* external global networking replacements */
+static sendto_t	  iax_sendto = sendto;
+static recvfrom_t iax_recvfrom = recvfrom;
+
 struct iax_session {
 	/* Private data */
 	void *pvt;
-	/* Sendto function */
+	/* session-local Sendto function */
 	sendto_t sendto;
 	/* Is voice quelched (e.g. hold) */
 	int quelch;
@@ -216,6 +225,10 @@
 	int transferpeer;	/* for attended transfer */
 	int transfer_moh;	/* for music on hold while performing attended transfer */
 
+#ifdef NEWJB
+	jitterbuf *jb;
+#endif
+
 	/* For linking if there are multiple connections */
 	struct iax_session *next;
 };
@@ -257,6 +270,12 @@
 	s->sendto = ptr;
 }
 
+void iax_set_networking(sendto_t st, recvfrom_t rf)
+{
+	iax_sendto = st;
+	iax_recvfrom = rf;
+}
+
 /* This is a little strange, but to debug you call DEBU(G "Hello World!\n"); */ 
 #ifdef	WIN32
 #define G __FILE__, __LINE__,
@@ -418,7 +437,10 @@
 		s->peercallno = 0;
 		s->transferpeer = 0;		/* for attended transfer */
 		s->next = sessions;
-		s->sendto = sendto;
+		s->sendto = iax_sendto;
+#ifdef NEWJB
+		s->jb = jb_new();
+#endif
 		sessions = s;
 	}
 	return s;
@@ -569,6 +591,7 @@
 	int sinlen;
 	int flags;
 	
+	if(iax_recvfrom == recvfrom) {
 	if (netfd > -1) {
 		/* Sokay, just don't do anything */
 		DEBU(G "Already initialized.");
@@ -623,6 +646,7 @@
 	}
 #endif
 	portno = ntohs(sin.sin_port);
+	}
 	srand(time(NULL));
 	callnums = rand() % 32767 + 1;
 	transfer_id = rand() % 32767 + 1;
@@ -1097,6 +1121,15 @@
 				prev->next = session->next;
 			else
 				sessions = session->next;
+#ifdef NEWJB
+			{
+			    jb_frame frame;
+			    while(jb_getall(session->jb,&frame) == JB_OK) 
+				iax_event_free(frame.data);
+		   	
+			    jb_destroy(session->jb);
+			}
+#endif
 			free(session);
 			return;
 		}
@@ -1640,6 +1673,36 @@
 
 #define FUDGE 1
 
+#ifdef NEWJB
+/* From chan_iax2/steve davies:  need to get permission from steve or digium, I guess */
+static long unwrap_timestamp(long ts, long last)
+{
+        int x;
+
+        if ( (ts & 0xFFFF0000) == (last & 0xFFFF0000) ) {
+                x = ts - last;
+                if (x < -50000) {
+                        /* Sudden big jump backwards in timestamp:
+                           What likely happened here is that miniframe timestamp has circled but we haven't
+                           gotten the update from the main packet.  We'll just pretend that we did, and
+                           update the timestamp appropriately. */
+                        ts = ( (last & 0xFFFF0000) + 0x10000) | (ts & 0xFFFF);
+                                DEBU(G "schedule_delivery: pushed forward timestamp\n");
+                }
+                if (x > 50000) {
+                        /* Sudden apparent big jump forwards in timestamp:
+                           What's likely happened is this is an old miniframe belonging to the previous
+                           top-16-bit timestamp that has turned up out of order.
+                           Adjust the timestamp appropriately. */
+                        ts = ( (last & 0xFFFF0000) - 0x10000) | (ts & 0xFFFF);
+                                DEBU(G "schedule_delivery: pushed back timestamp\n");
+                }
+        }
+	return ts;
+}
+#endif
+
+
 static struct iax_event *schedule_delivery(struct iax_event *e, unsigned int ts, int updatehistory)
 {
 	/* 
@@ -1676,6 +1739,26 @@
 	}
 #endif
 	
+#ifdef NEWJB
+	{
+	    int type = JB_TYPE_CONTROL;
+	    int len = 0;
+
+	    if(e->etype == IAX_EVENT_VOICE) {
+	      type = JB_TYPE_VOICE;
+	      len = 20; /* XXX use correct values here -- iLBC case ? */
+	    }
+	    /* TODO2: CNG frame types need to be parsed and used here */
+
+	    /* insert into jitterbuffer */
+	    /* TODO: Perhaps we could act immediately if it's not droppable and late */
+	    jb_put(e->session->jb, e, type, len,
+		    unwrap_timestamp(ts,e->session->last_ts), 
+		    calc_rxstamp(e->session)
+	    );
+	}
+#else
+	
 	/* How many ms from now should this packet be delivered? (remember
 	   this can be a negative number, too */
 	ms = calc_rxstamp(e->session) - ts;
@@ -1798,6 +1881,7 @@
 #ifdef EXTREME_DEBUG
 	DEBU(G "Delivering packet in %d ms\n", ms);
 #endif
+#endif /* NEWJB */
 	return NULL;
 	
 }
@@ -2275,7 +2359,7 @@
 	struct sockaddr_in sin;
 	int sinlen;
 	sinlen = sizeof(sin);
-	res = recvfrom(netfd, buf, sizeof(buf), 0, (struct sockaddr *) &sin, &sinlen);
+	res = iax_recvfrom(netfd, buf, sizeof(buf), 0, (struct sockaddr *) &sin, &sinlen);
 	if (res < 0) {
 #ifdef	WIN32
 		if (WSAGetLastError() != WSAEWOULDBLOCK) {
@@ -2415,6 +2499,59 @@
 		free(cur);
 	}
 
+#ifdef NEWJB
+	/* get jitterbuffer-scheduled events */
+	{
+	    struct iax_session *cur;
+	    jb_frame frame;
+	    for(cur=sessions; cur; cur=cur->next) {
+		int ret;
+		long now;
+
+		now = (tv.tv_sec - cur->rxcore.tv_sec) * 1000 +
+		      (tv.tv_usec - cur->rxcore.tv_usec) / 1000;
+
+		if(now > jb_next(cur->jb)) {
+		    ret = jb_get(cur->jb,&frame,now);
+		    switch(ret) {
+			case JB_OK:
+			    event = frame.data;
+			    event = handle_event(event);
+			    if (event) {
+				    return event;
+			    }
+			break;
+			case JB_INTERP:
+			    /* create an interpolation frame */
+			    //fprintf(stderr, "Making Interpolation frame\n");
+			    event = (struct iax_event *)malloc(sizeof(struct iax_event));
+			    if (event) {
+				    event->etype    = IAX_EVENT_VOICE;
+				    event->subclass = cur->voiceformat;
+				    event->ts	    = now; /* XXX: ??? applications probably ignore this anyway */
+				    event->session  = cur;
+				    event->datalen  = 0;
+				    handle_event(event);
+				    if(event)
+					return event;
+			    }
+			break;
+			case JB_DROP:
+			    iax_event_free(frame.data);
+			break;
+			case JB_NOFRAME:
+			case JB_EMPTY:
+			    /* do nothing */
+			break;
+			default:
+			    /* shouldn't happen */
+			break;
+		    }
+		}
+	    }
+	}
+
+#endif
 	/* Now look for networking events */
 	if (blocking) {
 		/* Block until there is data if desired */
@@ -2426,6 +2563,7 @@
 	
 		nextEventTime = iax_time_to_next_event(); 
 
+
 		if(nextEventTime < 0) 
 		select(netfd + 1, &fds, NULL, NULL, NULL);
 		else 


More information about the asterisk-dev mailing list