Index: Makefile =================================================================== RCS file: /usr/cvsroot/asterisk/Makefile,v retrieving revision 1.170 diff -u -r1.170 Makefile --- Makefile 23 Jun 2005 17:32:36 -0000 1.170 +++ Makefile 27 Jun 2005 11:53:18 -0000 @@ -117,6 +117,8 @@ #OPTIONS += -DLOW_MEMORY #endif +OPTIONS += -DNEWJB + # Optional debugging parameters DEBUG_THREADS = #-DDEBUG_THREADS #-DDO_CRASH #-DDETECT_DEADLOCKS Index: rtp.c =================================================================== RCS file: /usr/cvsroot/asterisk/rtp.c,v retrieving revision 1.136 diff -u -r1.136 rtp.c --- rtp.c 18 Jun 2005 17:13:01 -0000 1.136 +++ rtp.c 27 Jun 2005 11:53:20 -0000 @@ -67,6 +67,67 @@ #define FLAG_3389_WARNING (1 << 0) + +/* Slav - some static jitterbuffer stuff */ +#ifdef USE_JB +#include "jitterbuf.h" +#include +#include +#include +#endif + +#ifdef USE_JB +/* Macros for JB logs */ +/*#define jb_verbose(...) ast_verbose(VERBOSE_PREFIX_3 " ***[RTP JB LOG]*** " __VA_ARGS__)*/ +#define jb_verbose(...) if(1){\ + char tmp[192];\ + char msg[128];\ + snprintf(msg, sizeof(msg), VERBOSE_PREFIX_3 "***[RTP JB LOG]*** " __VA_ARGS__);\ + ast_verbose("%s\n", term_color(tmp, msg, COLOR_BRGREEN, 0, sizeof(tmp)));} + +/* Macros for the frame log files */ +#define jb_framelog(...) if(rtp->jb_logfile) {fprintf(rtp->jb_logfile, __VA_ARGS__); fflush(rtp->jb_logfile);} + +/* Macros for getting string representation of the jb type */ +static const char *jb_types[] = {"JB_TYPE_CONTROL", "JB_TYPE_VOICE", "JB_TYPE_VIDEO", "JB_TYPE_SILENCE"}; +#define jb_type(type) ((type >= 0 && type <= 3) ? jb_types[type] : "UNKNOWN") + +/* Counter for the frame log files */ +static int rtp_framelog_counter = 0; + +/* Scheduler context for the jitterbuffer */ +static struct sched_context *jb_sched; + +/* Scheduler thread */ +static pthread_t jb_sched_thread; + +/* Mutex for the scheduler queue synchronization */ +AST_MUTEX_DEFINE_STATIC(jb_sched_lock); + +/* Conditional variable for the scheduler queue synchronization */ +static pthread_cond_t jb_sched_flag = PTHREAD_COND_INITIALIZER; + +/* Linked list queue from rtps to be destroyed */ +static struct ast_rtp *rtp_free_head = NULL; +static struct ast_rtp *rtp_free_tail = NULL; + +/* JB function prototypes */ +static void rtp_free_enqueue(struct ast_rtp *rtp); +static struct ast_rtp * rtp_free_dequeue(void); +static void rtp_free_destroy_all(void); +static void * jb_scheduler_thread(void *data); +static struct ast_frame *ast_rtp_read_real(struct ast_rtp *rtp); +static int put_in_jb(struct ast_rtp *rtp, struct ast_frame *f); +static int get_from_jb(void *data); +static void rtp_jb_clear(struct ast_rtp *rtp); +static void rtp_jb_destroy(struct ast_rtp *rtp); +static int calc_rxstamp_jb(struct ast_rtp *rtp); +static int get_samples(struct ast_frame *f); +static void jb_get_timespec(struct timespec *ts, int millisec_offset); +static int rtpread(int *id, int fd, short events, void *cbdata); +#endif +/* End Slav */ + struct ast_rtp { int s; char resp; @@ -104,6 +165,31 @@ int rtp_lookup_code_cache_result; int rtp_offered_from_local; struct ast_rtcp *rtcp; + + /* Slav - jitterbuffer related members */ +#ifdef USE_JB + /* The jitterbuffer itself */ + jitterbuf *jb; + /* Scheduler id */ + int jbid; + /* Timestamp of the last frame received. Used to detect big jumps in the timestamps */ + unsigned int prev_timestamp; + /* Mutex for synchronization of the jb_put() - jb_get() operations and for destroy safety also */ + ast_mutex_t jblock; + /* Indicates that this rtp has passed ast_rtp_destroy() */ + int destroyed; + /* The pvt helper */ + struct ast_rtp_pvt_helper pvt_helper; + /* Points to next rtp in the free linked list */ + struct ast_rtp *next; + /* is the channel up? */ + int isup; + /* File for frame timestamp tracing */ + FILE *jb_logfile; + /* Pathname of the log file */ + char jb_logfile_pathname[PATH_MAX]; +#endif + /* End Slav */ }; struct ast_rtcp { @@ -303,6 +389,656 @@ return f; } + + + +/* Slav - ****************************************************** + ******************** Utility functions ********************** + *************************************************************/ + +/* Slav - got this from the ast_rtp_read() func - its very possible we don't need this function, + because from ast_rtp_read() is seems that the number of samples is already set to the frame */ +#ifdef USE_JB +static int get_samples(struct ast_frame *f) +{ + int samples=0; + + switch(f->subclass) + { + case AST_FORMAT_ULAW: + case AST_FORMAT_ALAW: + samples = f->datalen; + break; + case AST_FORMAT_SLINEAR: + samples = f->datalen / 2; + break; + case AST_FORMAT_GSM: + samples = 160 * (f->datalen / 33); + break; + case AST_FORMAT_ILBC: + samples = 240 * (f->datalen / 50); + break; + case AST_FORMAT_ADPCM: + case AST_FORMAT_G726: + samples = f->datalen * 2; + break; + case AST_FORMAT_G729A: + samples = f->datalen * 8; + break; +#if 0 + case AST_FORMAT_G723_1: + samples = g723_samples(f->data, f->datalen); + break; +#endif + case AST_FORMAT_SPEEX: + /* assumes that the RTP packet contained one Speex frame */ + samples = 160; + break; + case AST_FORMAT_LPC10: + samples = 22 * 8; + samples += (((char *)(f->data))[7] & 0x1) * 8; + break; + default: + ast_log(LOG_NOTICE, "Unable to calculate samples for format %s\n", ast_getformatname(f->subclass)); + return samples; + } + + // TODO: Test if this doesn't brokes something. + f->samples = samples; + return samples; +} +#endif +/* End Slav */ + + +/* Slav - calculates the time now [ms] in receivers time, e.g. counted from the time of the first frame + received (actually from the "zero" of the received frames time, which can be less than the time of + the first frame received). */ +#ifdef USE_JB +static int calc_rxstamp_jb(struct ast_rtp *rtp) +{ + /* TODO: Should be rewritten to reuse the functionality of the already present + calc_rxstamp(struct timeval *tv, struct ast_rtp *rtp, unsigned int timestamp, int mark) + function, instead of duplicating it. */ + struct timeval tv; + unsigned int timestamp = rtp->lastrxts; + + if(!rtp->rxcore.tv_sec && !rtp->rxcore.tv_usec) + { + /* RTP rxcore invalid - fix it anyway */ + gettimeofday(&rtp->rxcore, NULL); + rtp->rxcore.tv_sec -= timestamp / 8000; + rtp->rxcore.tv_usec -= (timestamp % 8000) * 125; + /* Round to 20ms for nice, pretty timestamps */ + rtp->rxcore.tv_usec -= rtp->rxcore.tv_usec % 20000; + if(rtp->rxcore.tv_usec < 0) + { + /* Adjust appropriately if necessary */ + rtp->rxcore.tv_usec += 1000000; + rtp->rxcore.tv_sec -= 1; + } + + jb_framelog("JB_PUT[%x] Recalc rxcore=%ld\n", + rtp, (rtp->rxcore.tv_sec * 1000 + rtp->rxcore.tv_usec / 1000)); + } + + gettimeofday(&tv, NULL); + + return (tv.tv_sec - rtp->rxcore.tv_sec) * 1000 + (1000000 + tv.tv_usec - rtp->rxcore.tv_usec) / 1000 - 1000; +} +#endif +/* End Slav */ + + +/* Slav - utility function - fills in the members of a timespec struct with the time now + millisec_offset + millisec_offset must be between 0 and 1000 */ +#ifdef USE_JB +static void jb_get_timespec(struct timespec *ts, int millisec_offset) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000 + millisec_offset * 1000000; + if(ts->tv_nsec >= 1000000000) + { + ts->tv_sec += 1; + ts->tv_nsec -= 1000000000; + } +} +#endif +/* End Slav */ + + +/* Slav - check for rtps needing to be destroyed */ +#ifdef USE_JB +static void rtp_free_enqueue(struct ast_rtp *rtp) +{ + rtp->next = NULL; + + if(rtp_free_tail == NULL) + { + rtp_free_tail = rtp_free_head = rtp; + return; + } + + rtp_free_tail->next = rtp; + rtp_free_tail = rtp; +} + +static struct ast_rtp * rtp_free_dequeue(void) +{ + struct ast_rtp *rtp = rtp_free_head; + + if(rtp_free_head != NULL) + { + rtp_free_head = rtp_free_head->next; + if(rtp_free_head == NULL) + { + rtp_free_tail = NULL; + } + } + + return rtp; +} +#endif +/* End Slav */ + +/* End Slav - ************************************************** + ****************** End Utility functions ******************** + *************************************************************/ + + +/* Slav - ****************************************************** + ***************** Jitterbuf main functions ****************** + *************************************************************/ + +/* Slav - puts a frame into the jitterbuf */ +#ifdef USE_JB +static int put_in_jb(struct ast_rtp *rtp, struct ast_frame *f) +{ + long ts; + unsigned long ts_test; + long now; + int type; + int len; + struct ast_frame *frame; + + /* if the call is not yet answered, don't put the frame into the jb - just let is goes through + the regular way. */ + if(rtp->pvt_helper.get_chan_state(rtp->pvt_helper.pvt) != AST_STATE_UP) + { + return -1; + } + else if(!rtp->isup) + { + /* The channel is up for the very first time. */ + rtp->isup = 1; + rtp->rxcore.tv_sec = 0; + rtp->rxcore.tv_usec = 0; + jb_framelog("JB_PUT[%x] {now=%ld}: The call was answered.\n", rtp, (now = calc_rxstamp_jb(rtp))); + rtp->rxcore.tv_sec = 0; + rtp->rxcore.tv_usec = 0; + } + + /* Test for unsigned long overflow */ + if((ts_test = rtp->lastrxts / 8) > (1 << (8*sizeof(long)-1)) - 1) + { + jb_framelog("JB_PUT[%x] {now=%ld}: Received ts=%lu, which is greater than the max unsigned long number!\n", + rtp, (now = calc_rxstamp_jb(rtp)), ts_test); + ast_log(LOG_ERROR, "Received ts=%lu, which is greater than the max unsigned long number!\n", ts_test); + /* assert if in debug */ + assert(0); + /* else just return and do nothing */ + return 0; + } + + /* convert the timestamp of the frame from samples to millisec */ + ts = rtp->lastrxts / 8; + + /* try to detect a jump in the timestaps */ + if(rtp->prev_timestamp != -1) + { + int diff = (unsigned int) ts - rtp->prev_timestamp; + /* We consider the jump for a big, if its greater than 3 * 160 ... */ + if(diff > 3 * 160 || diff < -3 * 160) + { + /* Reset rxcore - this will cause calc_rxstamp_jb() to recalc it */ + rtp->rxcore.tv_sec = 0; + rtp->rxcore.tv_usec = 0; + // TODO: Reset also the jitterbuffer? + } + } + + /* Store the last timestamp */ + rtp->prev_timestamp = (unsigned int) ts; + + type = JB_TYPE_CONTROL; + len = 0; /* Length of the frame (in milleisec) */ + + if(f->frametype == AST_FRAME_VOICE) + { + type = JB_TYPE_VOICE; + len = get_samples(f) / 8; + // TODO: Cant we use insted just:? + //len = f->samples + } + else if(f->frametype == AST_FRAME_CNG) + { + type = JB_TYPE_SILENCE; + } + + /* Allocate a new frame */ + frame = ast_frisolate(f); + if(frame == NULL) + { + ast_log(LOG_ERROR, "Cannot isolate frame for the jitterbuffer.\n"); + return -1; + } + + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* The moment now in receivers time (in millisec) */ + now = calc_rxstamp_jb(rtp); + + /* Put into the jitterbuffer. If jb_put returns JB_DROP, the frame should be dropped */ + if(jb_put(rtp->jb, frame, type, len, ts, now) == JB_DROP) + { + jb_framelog("JB_PUT[%x] {now=%ld}: Dropped %s frame with ts=%lu\n", rtp, now, jb_type(type), ts); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* Drop the frame - just do nothing, except of cource, freeing the isolated frame. */ + ast_frfree(frame); + } + else + { + jb_framelog("JB_PUT[%x] {now=%ld}: Queued %s frame with ts=%lu\n", rtp, now, jb_type(type), ts); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + } + + return 0; +} +#endif +/* End Slav */ + + +/* Slav - gets a frame from the jitterbuf for time now */ +#ifdef USE_JB +static int get_from_jb(void *data) +{ + struct ast_rtp *rtp = (struct ast_rtp *) data; + jb_frame frame; + struct ast_frame af; + int ret; + long now; + long next; + unsigned int when; + unsigned int tmp; + struct timeval tv; + + if(rtp == NULL) + { + /* this should never happens */ + ast_log(LOG_ERROR, "Received NULL rtp data!\n"); + /* if in debug - abort asterisk */ + assert(0); + /* else - return 0 to request schedule removal (don't post to the semaphore) */ + /* TODO: Should we do this? Maybe is better to crash or to return 1... */ + return 0; + } + + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* check for destroy conditon */ + if(rtp->destroyed) + { + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* return 1 and do nothing - this rtp should be added in the free list, so later, + after all schedules are executed, this sched will be safely removed. */ + return 1; + } + + /* is it time for the next frame in the jb? */ + gettimeofday(&tv, NULL); + //now = (tv.tv_sec - rtp->rxcore.tv_sec) * 1000 + (tv.tv_usec - rtp->rxcore.tv_usec) / 1000; + now = (tv.tv_sec - rtp->rxcore.tv_sec) * 1000 + (1000000 + tv.tv_usec - rtp->rxcore.tv_usec) / 1000 - 1000; + if(now >= (next = jb_next(rtp->jb))) + { + /* get a frame from the jb for time now */ + /* buggerit! i just hardcode the length to 20ms since i don't use iLBC */ + ret = jb_get(rtp->jb, &frame, now, 20); + + /* Proceed with the frame timestamps logging while still holding the jb mutex lock */ + if(rtp->jb_logfile != NULL && rtp->pvt_helper.get_chan_state(rtp->pvt_helper.pvt) == AST_STATE_UP) + { + switch(ret) + { + case JB_OK: + jb_framelog("\tJB_GET[%x] {now=%ld}: Delivered %s frame with timestamp %ld\n", + rtp, now, jb_type(frame.type), frame.ts); + break; + case JB_INTERP: + jb_framelog("\tJB_GET[%x] {now=%ld}: Interpolated frame\n", rtp, now); + break; + case JB_DROP: + jb_framelog("\tJB_GET[%x] {now=%ld}: Dropped %s frame with timestamp %ld\n", + rtp, now, jb_type(frame.type), frame.ts); + break; + case JB_NOFRAME: + jb_framelog("\tJB_GET[%x] {now=%ld}: No frame\n", rtp, now); + break; + case JB_EMPTY: + jb_framelog("\tJB_GET[%x] {now=%ld}: Jb is empty\n", rtp, now); + break; + default: + ast_log(LOG_ERROR, "This should never happens!"); + break; + } + } + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + switch(ret) + { + case JB_OK: + /* don't deliver if the channel is not up */ + if(rtp->pvt_helper.get_chan_state(rtp->pvt_helper.pvt) == AST_STATE_UP) + { + /* deliver the frame into the channel's queue (internally locks the channel mutex!) */ + rtp->pvt_helper.queue_frame(rtp->pvt_helper.pvt, frame.data); + } + ast_frfree(frame.data); + break; + case JB_INTERP: + /* don't interpolate if the channel is not up */ + if(rtp->pvt_helper.get_chan_state(rtp->pvt_helper.pvt) == AST_STATE_UP) + { + /* Create an interpolation frame */ + memset(&af, 0, sizeof(struct ast_frame)); + // TODO: Remove the unnecessary member zero initializations + af.frametype = AST_FRAME_VOICE; + af.subclass = rtp->pvt_helper.get_nativeformats(rtp->pvt_helper.pvt); + af.datalen = 0; + af.samples = frame.ms * 8; + /* Mark as non malloced, because of ast_queue_frame? */ + af.mallocd = 0; + af.src = "SIP JB interpolation"; + af.data = NULL; + // TODO: Do we need to set a delivery timeval? + af.delivery.tv_sec = rtp->rxcore.tv_sec; + af.delivery.tv_usec = rtp->rxcore.tv_usec; + af.delivery.tv_sec += next / 1000; + af.delivery.tv_usec += (next % 1000) * 1000; + af.offset=AST_FRIENDLY_OFFSET; + if(af.delivery.tv_usec >= 1000000) + { + af.delivery.tv_usec -= 1000000; + af.delivery.tv_sec += 1; + } + /* deliver the frame into the channel's queue (internally locks the channel mutex!) */ + rtp->pvt_helper.queue_frame(rtp->pvt_helper.pvt, &af); + } + break; + case JB_DROP: + /* Just free the frame */ + ast_frfree(frame.data); + break; + case JB_NOFRAME: + break; + case JB_EMPTY: + break; + default: + ast_log(LOG_ERROR, "This should never happens!"); + break; + } + } + else + { + jb_framelog("\tJB_GET[%x] {now=%ld}: now <= next=%ld\n", rtp, now, next); + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + } + + /* Calc how many millisec remains to the next time a frame should be delivered */ + gettimeofday(&tv, NULL); + //now = (tv.tv_sec - rtp->rxcore.tv_sec) * 1000 + (tv.tv_usec - rtp->rxcore.tv_usec) / 1000; + when = jb_next(rtp->jb) - now; + if(when < 10 || when > 30) + { + /* Obviously the jitterbuffer is confused. Use the length of the last frame. */ + tmp = (frame.ms >= 10 && frame.ms <= 20) ? frame.ms : 20; + jb_framelog("\tJB_GET[%x] {now=%ld}: Invalid when=%u. Resetting to %d ms.\n", rtp, now, when, tmp); + when = tmp; + } + when--; + + /* reschedule (adds again this schedule) */ + rtp->jbid = ast_sched_add(jb_sched, when, get_from_jb, rtp); + + return 0; + + //return 1; +} +#endif +/* End Slav */ + + +/* Slav - called from a channel who wants a jitterbuff enabled - preforms jitterbuff initialization */ +#ifdef USE_JB +void ast_rtp_jb_enable(struct ast_rtp *rtp, struct ast_rtp_pvt_helper *pvt_helper, + struct sched_context *ex_sched, struct io_context *ex_io) +{ +#ifdef NEWJB + jb_conf jbconf; +#else /* NEWJB */ + jb_info jbinfo; +#endif /* NEWJB */ + + /* Shouldn't be called more than once on a given rtp! */ + assert(rtp->jb == NULL); + + /* Initialize the prev timestamp */ + rtp->prev_timestamp = -1; + + /* Initialize the destroy flag */ + rtp->destroyed = 0; + + /* Initialize the jb sched id */ + rtp->jbid = -1; + + /* Create the jitterbuf */ + rtp->jb = jb_new(); + + /* Is the new jitterbuf successfuly created? */ + if(rtp->jb != NULL) + { + /* Copy the pvt_helper data. TODO: Make sure the pvt_helper members are valid */ + memcpy(&rtp->pvt_helper, pvt_helper, sizeof(struct ast_rtp_pvt_helper)); + + /* Set the maximum lenght of the jitterbuffer (in milliseconds) */ +#ifdef NEWJB + jbconf.max_jitterbuf = 500; + jb_setconf(rtp->jb,&jbconf); +#else /* NEWJB */ + jbinfo.max_jitterbuf = 500; + jb_setinfo(rtp->jb,&jbinfo); +#endif /* NEWJB */ + + /* Initialize the jb mutex */ + ast_mutex_init(&rtp->jblock); + + /* lock the queue mutex */ + ast_mutex_lock(&jb_sched_lock); + + /* Create a frame log file */ + /* + snprintf(rtp->jb_logfile_pathname, sizeof(rtp->jb_logfile_pathname), + "/tmp/jb_frames_%.5d.log", rtp_framelog_counter++); + rtp->jb_logfile = fopen(rtp->jb_logfile_pathname, "w+b"); + */ + + /* add a new schedule for get_from_jb() (scheduled each 20 ms?) */ + rtp->jbid = ast_sched_add(jb_sched, 20, get_from_jb, rtp); + + /* signal the schedule queue to wakeup the scheduler (if its waiting) */ + pthread_cond_signal(&jb_sched_flag); + + /* unlock the queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + + jb_verbose("Jitterbuffer enabled."); + } + else + { + /* Cannot allocate memory for the jb - only log a warning and continue using the + channel without a jb */ + ast_log(LOG_WARNING, "Unable to allocate new jitterbuffer.\n"); + } +} +#endif +/* End Slav */ + + +/* Slav - removes all remaining frames from the jb and frees them */ +#ifdef USE_JB +static void rtp_jb_clear(struct ast_rtp *rtp) +{ + jb_frame frame; + + /* remove and free any remaining frames in the jb */ + while(jb_getall(rtp->jb,&frame) == JB_OK) + { + ast_frfree(frame.data); + } +} +#endif +/* End Slav */ + + +/* Slav - destroys the jitterbuf and the jb mutex and frees its rtp container */ +#ifdef USE_JB +static void rtp_jb_destroy(struct ast_rtp *rtp) +{ + /* destroy the jitterbuffer */ + jb_destroy(rtp->jb); + rtp->jb = NULL; + + /* close the framelog file */ + if(rtp->jb_logfile != NULL) + { + fseek(rtp->jb_logfile, 0, SEEK_END); + int len = ftell(rtp->jb_logfile); + fclose(rtp->jb_logfile); + rtp->jb_logfile = NULL; + + /* If there is less than 1024 bytes logged in this file - delete it */ + if(len < 1024) + { + remove(rtp->jb_logfile_pathname); + } + } + + /* destroy the jb mutex */ + ast_mutex_destroy(&rtp->jblock); + + /* finally free the rtp */ + free(rtp); + + jb_verbose("Jitterbuffer destroyed."); +} +#endif +/* End Slav */ + + +/* Slav - destroys all rtps on the free list */ +#ifdef USE_JB +static void rtp_free_destroy_all(void) +{ + struct ast_rtp *rtp; + + while((rtp = rtp_free_dequeue()) != NULL) + { + /* remove the jb sched */ + if(rtp->jbid != -1) + { + ast_sched_del(jb_sched, rtp->jbid); + rtp->jbid = -1; + } + + /* clear the jb */ + rtp_jb_clear(rtp); + + /* destroy the jb and free the rtp */ + rtp_jb_destroy(rtp); + } +} +#endif +/* End Slav */ + + +/* Slav - jitterbuffer scheduler thread function */ +#ifdef USE_JB +static void * jb_scheduler_thread(void *data) +{ + int when; + struct timespec abstime; + + jb_verbose("The rtp jitterbuffer scheduler thread is running."); + + while(1) + { + /* lock the queue mutex */ + ast_mutex_lock(&jb_sched_lock); + + /* get how many millisecs to wait until the first event scheduled */ + when = ast_sched_wait(jb_sched); + if(when < 0 || when > 1000) + { + when = 1000; + } + + /* wait until its time for the first schedule - can be interrupted by adding new + schedule, or by adding an rtp to the free list for destroying. + (internally unlocks the queue mutex and locks it after the waiting is finished) */ + jb_get_timespec(&abstime, when); + pthread_cond_timedwait(&jb_sched_flag, &jb_sched_lock, &abstime); + + /* destroy all rtps on the free list */ + rtp_free_destroy_all(); + + /* unlock the queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + + /* exec all for time now */ + ast_sched_runq(jb_sched); + } + + return NULL; +} +#endif +/* End Slav */ + + + +/* End Slav - ************************************************** + *************** End Jitterbuf main functions **************** + *************************************************************/ + + + + + static int rtpread(int *id, int fd, short events, void *cbdata) { struct ast_rtp *rtp = cbdata; @@ -383,7 +1119,46 @@ } } + +/* Slav - the jitterbuf version of ast_rtp_read(). Returns AST_FRAME_NULL if the jb is used */ +#ifdef USE_JB +struct ast_frame *ast_rtp_read(struct ast_rtp *rtp) +{ + static struct ast_frame *frr, nf = { AST_FRAME_NULL, }; + + /* Is a jitterbuffer used for this rtp? */ + if(rtp->jb != NULL) + { + /* Read a frame from the network */ + frr = ast_rtp_read_real(rtp); + + /* Put the frame into the jb. */ + if(put_in_jb(rtp, frr) == 0) + { + return &nf; + } + + /* If put_in_jb() returns non zero (e.g. the channel is not up), deliver the frame immediately. */ + return frr; + } + + /* If not using jb, deliver the frame immediately. */ + return ast_rtp_read_real(rtp); +} +#endif +/* End Slav */ + + +/* Slav - the real ast_rtp_read() function will be used if the jitterbuffer is not used */ +#ifndef USE_JB +/* End Slav */ struct ast_frame *ast_rtp_read(struct ast_rtp *rtp) +/* Slav - else predefine the real ast_rtp_read() func, so the jb version will be used from + all rtp channels */ +#else +static struct ast_frame *ast_rtp_read_real(struct ast_rtp *rtp) +#endif +/* End Slav */ { int res; struct sockaddr_in sin; @@ -1008,7 +1783,40 @@ close(rtp->rtcp->s); free(rtp->rtcp); } + /* Slav - don't free the rtp here if we are using the jb - just add it to the free list. */ +#ifndef USE_JB + /* End Slav */ free(rtp); + /* Slav - add the rtp to the free list, but only if a jb is enabled */ +#else + /* Does a jitterbuf is used for this rtp? */ + if(rtp->jb != NULL) + { + /* lock the jb mutex */ + ast_mutex_lock(&rtp->jblock); + + /* set the destroy flag to let get_from_jb() knows that the rtp is on the free list */ + rtp->destroyed = 1; + + /* unlock the jb mutex */ + ast_mutex_unlock(&rtp->jblock); + + /* lock the sched queue mutex */ + ast_mutex_lock(&jb_sched_lock); + + /* add the rtp to the free list */ + rtp_free_enqueue(rtp); + + /* unlock the sched queue mutex */ + ast_mutex_unlock(&jb_sched_lock); + } + else + { + /* No - free the rtp */ + free(rtp); + } +#endif + /* End Slav */ } static unsigned int calc_txstamp(struct ast_rtp *rtp, struct timeval *delivery) @@ -1759,4 +2567,25 @@ ast_cli_register(&cli_debug_ip); ast_cli_register(&cli_no_debug); ast_rtp_reload(); + + /* Slav - init the rtp jb scheduler stuff */ +#ifdef USE_JB + /* Create a scheduler context */ + jb_sched = sched_context_create(); + if(jb_sched == NULL) + { + ast_log(LOG_ERROR, "Unable to create rtp jb scheduler context.\n"); + assert(0); + } + + /* Start the scheduler thread */ + if(ast_pthread_create(&jb_sched_thread, NULL, jb_scheduler_thread, NULL) < 0) + { + ast_log(LOG_ERROR, "Unable to start rtp jb scheduler thread.\n"); + assert(0); + } + + jb_verbose("Started the rtp jitterbuffer scheduler thread."); +#endif + /* End Slav */ } Index: channels/chan_sip.c =================================================================== RCS file: /usr/cvsroot/asterisk/channels/chan_sip.c,v retrieving revision 1.771 diff -u -r1.771 chan_sip.c --- channels/chan_sip.c 24 Jun 2005 02:15:04 -0000 1.771 +++ channels/chan_sip.c 27 Jun 2005 11:53:25 -0000 @@ -2766,10 +2766,58 @@ snprintf(callid, len, "@%s", ast_inet_ntoa(iabuf, sizeof(iabuf), ourip)); } +/* Slav - functions for the rtp pvt helper */ +#ifdef USE_JB +static int sip_queue_frame(void *pvt, struct ast_frame *f) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return ast_queue_frame(p->owner, f); + } + else + { + return -1; + } +} + +static int sip_get_chan_state(void *pvt) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return p->owner->_state; + } + else + { + return -1; + } +} + +static int sip_get_nativeformats(void *pvt) +{ + struct sip_pvt *p = (struct sip_pvt *) pvt; + if(p->owner != NULL) + { + return p->owner->nativeformats; + } + else + { + return -1; + } +} +#endif +/* End Slav */ + /*--- sip_alloc: Allocate SIP_PVT structure and set defaults ---*/ static struct sip_pvt *sip_alloc(char *callid, struct sockaddr_in *sin, int useglobal_nat, const int intended_method) { struct sip_pvt *p; + /* Slav - pvt_helper fot the rtp jitterbuffer */ +#ifdef USE_JB + struct ast_rtp_pvt_helper pvt_helper; +#endif + /* End Slav */ p = malloc(sizeof(struct sip_pvt)); if (!p) @@ -2801,7 +2849,20 @@ if (sip_methods[intended_method].need_rtp) { p->rtp = ast_rtp_new_with_bindaddr(sched, io, 1, 0, bindaddr.sin_addr); - if (videosupport) + + /* Slav - enable a jitterbuffer on the newly created rtp */ +#ifdef USE_JB + /* Init the pvt helper functions */ + pvt_helper.pvt = p; + pvt_helper.queue_frame = sip_queue_frame; + pvt_helper.get_chan_state = sip_get_chan_state; + pvt_helper.get_nativeformats = sip_get_nativeformats; + /* enable the jitterbuffer */ + ast_rtp_jb_enable(p->rtp, &pvt_helper, sched, io); +#endif + /* End Slav */ + + if (videosupport) p->vrtp = ast_rtp_new_with_bindaddr(sched, io, 1, 0, bindaddr.sin_addr); if (!p->rtp) { ast_log(LOG_WARNING, "Unable to create RTP session: %s\n", strerror(errno)); Index: include/asterisk/rtp.h =================================================================== RCS file: /usr/cvsroot/asterisk/include/asterisk/rtp.h,v retrieving revision 1.22 diff -u -r1.22 rtp.h --- include/asterisk/rtp.h 21 Apr 2005 06:02:44 -0000 1.22 +++ include/asterisk/rtp.h 27 Jun 2005 11:53:32 -0000 @@ -25,6 +25,12 @@ extern "C" { #endif + +/* Slav - define USE_JB to enable the jitterbuffer for all rtp channels */ +#define USE_JB +/* End Slav */ + + /* Codes for RTP-specific data - not defined by our AST_FORMAT codes */ /*! DTMF (RFC2833) */ #define AST_RTP_DTMF (1 << 0) @@ -48,6 +54,33 @@ }; struct ast_rtp; + + +/* Slav - jitterbuffer related stuff */ +#ifdef USE_JB +typedef int (*pvt_queue_frame)(void *pvt, struct ast_frame *f); +typedef int (*pvt_get_chan_state)(void *pvt); +typedef int (*pvt_get_nativeformats)(void *pvt); + +/* Defines technology independent way to access some channel functionality */ +struct ast_rtp_pvt_helper +{ + /* Pointer to the tech private structure */ + void *pvt; + /* Pointer to a function for queueing a frame into a channel */ + pvt_queue_frame queue_frame; + /* Pointer to a function, returning the current channel state */ + pvt_get_chan_state get_chan_state; + /* Pointer to a function, returning the supported from the channel formats */ + pvt_get_nativeformats get_nativeformats; +}; + +/* Enables the jitterbuff for the specified channel (and rtp). Performs jitterbuff initialization */ +void ast_rtp_jb_enable(struct ast_rtp *rtp, struct ast_rtp_pvt_helper *pvt_helper, + struct sched_context *ex_sched, struct io_context *ex_io); +#endif +/* End Slav */ + typedef int (*ast_rtp_callback)(struct ast_rtp *rtp, struct ast_frame *f, void *data);