[svn-commits] russell: branch russell/chan_console r95353 - in /team/russell/chan_console: ...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sat Dec 29 00:10:55 CST 2007


Author: russell
Date: Sat Dec 29 00:10:50 2007
New Revision: 95353

URL: http://svn.digium.com/view/asterisk?view=rev&rev=95353
Log:
Significantly rework audio handling in this channel driver.  It seems to work
much better now, and doesn't cause portaudio to have internal lockups.  I think
that this is what I really needed to do to be able to move this forward ...

Now, all of the audio being sent to/from the portaudio callback is done through
a couple of lock-free ringbuffers.  Now the implementation of the portaudio
callback doesn't do any locking or any other operations that may sleep.

Modified:
    team/russell/chan_console/channels/chan_console.c
    team/russell/chan_console/main/frame.c
    team/russell/chan_console/utils/Makefile

Modified: team/russell/chan_console/channels/chan_console.c
URL: http://svn.digium.com/view/asterisk/team/russell/chan_console/channels/chan_console.c?view=diff&rev=95353&r1=95352&r2=95353
==============================================================================
--- team/russell/chan_console/channels/chan_console.c (original)
+++ team/russell/chan_console/channels/chan_console.c Sat Dec 29 00:10:50 2007
@@ -43,14 +43,6 @@
  * boost CLI command and .conf option
  *
  * console_video support
- *
- * TODO
- * \todo Convert the handling of playing sounds to active channels to use the
- *       indications API.  This will simplify the code and allow for localized
- *       tones as configured in indications.conf.
- * \todo Use an ast_smoother for handling frames being written out to the audio
- *       device.  This will remove the need for the write_buf and simplify the
- *       code.
  */
 
 /*** MODULEINFO
@@ -62,9 +54,7 @@
 
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
-#include <stdlib.h>
-#include <stdio.h>
-#include <unistd.h>
+#include <jack/ringbuffer.h>
 
 #include <portaudio.h>
 
@@ -96,6 +86,8 @@
 #define INPUT_CHANNELS   1
 /*! \brief Mono Output */
 #define OUTPUT_CHANNELS  1
+
+#define RINGBUFFER_SIZE 16384
 
 /*
  * XXX text message sizes are probably 256 chars, but i am
@@ -141,15 +133,6 @@
 	PaStream *stream;
 	/*! A frame for preparing to queue on to the channel */
 	struct ast_frame fr;
-	/*! Buffer to hold outgoing audio with the standard offset */
-	char read_buf[NUM_SAMPLES * 2 + AST_FRIENDLY_OFFSET];
-	/*! Buffer to hold left over samples from frames */
-	char write_buf[NUM_SAMPLES * 2];
-	/*! The index for the next write into write_buf */
-	unsigned int write_dst;
-	/*! The number of samples sent for the current sound, mod the total number
-	 *  of samples for the sound plus trailing silence. */
-	unsigned int sampsent;
 	/*! Running = 1, Not running = 0 */
 	unsigned int streamstate:1;
 	/*! On-hook = 0, Off-hook = 1 */
@@ -160,17 +143,15 @@
 	unsigned int autoanswer:1;
 	/*! Ignore context in the console dial CLI command */
 	unsigned int overridecontext:1;
-	/*! The PortAudio callback is being executed */
-	unsigned int incallback:1;
-	/*! Used by the PortAudio callback if it gets called but has to wait for
-	 *  the write function to get called to provide frames. */
-	ast_cond_t cond;
 	/*! Lock for this struct */
 	ast_mutex_t lock;
-	/*! List of frames to be written out to the device */
-	AST_LIST_HEAD_NOLOCK(, ast_frame) frames;
+	jack_ringbuffer_t *input_rb;
+	jack_ringbuffer_t *output_rb;
+	struct ast_smoother *smoother;
+	int fds[2];
 } console_pvt = {
 	.lock = AST_MUTEX_INIT_VALUE,
+	.fds = { -1, -1 },
 };
 
 /*! Global jitterbuffer configuration - by default, jb is disabled */
@@ -220,9 +201,12 @@
 static int start_stream(struct console_pvt *pvt)
 {
 	PaError res;
+	int ret_val = 0;
+
+	ast_mutex_lock(&pvt->lock);
 
 	if (pvt->streamstate)
-		return 0;
+		goto return_unlock;
 
 	pvt->streamstate = 1;
 	ast_debug(1, "Starting stream\n");
@@ -232,22 +216,23 @@
 	if (res != paNoError) {
 		ast_log(LOG_WARNING, "Failed to open default audio device - (%d) %s\n",
 			res, Pa_GetErrorText(res));
-		return -1;
+		ret_val = -1;
+		goto return_unlock;
 	}
 
 	res = Pa_StartStream(pvt->stream);
 	if (res != paNoError) {
 		ast_log(LOG_WARNING, "Failed to start stream - (%d) %s\n",
 			res, Pa_GetErrorText(res));
-		return -1;
-	}
-
-	return 0;
-}
-
-/*!
- * \note Called with pvt struct locked
- */
+		ret_val = -1;
+	}
+
+return_unlock:
+	ast_mutex_unlock(&pvt->lock);
+
+	return ret_val;
+}
+
 static int stop_stream(struct console_pvt *pvt)
 {
 	if (!pvt->streamstate)
@@ -258,7 +243,6 @@
 	Pa_CloseStream(pvt->stream);
 	pvt->stream = NULL;
 	pvt->streamstate = 0;
-	ast_cond_signal(&pvt->cond);
 	ast_mutex_unlock(&pvt->lock);
 
 	return 0;
@@ -279,6 +263,8 @@
 	chan->readformat = AST_FORMAT_SLINEAR;
 	chan->writeformat = AST_FORMAT_SLINEAR;
 	chan->tech_pvt = pvt;
+	chan->fds[0] = pvt->fds[0];
+
 	pvt->owner = chan;
 
 	if (!ast_strlen_zero(pvt->language))
@@ -354,15 +340,6 @@
 	c->tech_pvt = NULL;
 	pvt->owner = NULL;
 
-	/* Wait until the callback has exited before stopping the stream, also
-	 * ensuring that the callback isn't blocking while waiting on a frame. */
-	while (pvt->incallback) { /* :( */
-		ast_mutex_lock(&pvt->lock);
-		ast_cond_signal(&pvt->cond);
-		ast_mutex_unlock(&pvt->lock);
-		usleep(10);
-	}
-
 	stop_stream(pvt);
 
 	return 0;
@@ -370,44 +347,55 @@
 
 static int console_answer(struct ast_channel *c)
 {
+	struct console_pvt *pvt = &console_pvt;
+
+	ast_verb(1, V_BEGIN "Call from Console has been Answered" V_END);
+
+	ast_setstate(c, AST_STATE_UP);
+
+	return start_stream(pvt);
+}
+
+static struct ast_frame *console_read(struct ast_channel *chan)
+{
+	struct console_pvt *pvt = chan->tech_pvt;
+	struct pollfd pfd;
 	int res;
-	struct console_pvt *pvt = &console_pvt;
-
-	ast_verb(1, V_BEGIN "Call from Console has been Answered" V_END);
-
-	ast_mutex_lock(&pvt->lock);
-	ast_setstate(c, AST_STATE_UP);
-	res = start_stream(pvt);
-	ast_mutex_unlock(&pvt->lock);
-
-	return res;
-}
-
-/*
- * \brief Implementation of the ast_channel_tech read() callback
- *
- * Calling this function is harmless.  However, if it does get called, it
- * is an indication that something weird happened that really shouldn't
- * have and is worth looking into.
- * 
- * Why should this function not get called?  Well, let me explain.  There are
- * a couple of ways to pass on audio that has come from this channel.  The way
- * that this channel driver uses is that once the audio is available, it is
- * stuffed into an ast_frame and queued onto the channel using ast_queue_frame.
- *
- * The other method would be signalling to the core that there is audio waiting,
- * and that it needs to call the channel's read() callback to get it.  The way
- * the channel gets signalled is that one or more file descriptors are placed
- * in the fds array on the ast_channel which the core will poll() on.  When the
- * fd indicates that input is available, the read() callback is called.  This
- * is especially useful when there is a dedicated file descriptor where the
- * audio is read from.  An example would be the socket for an RTP stream.
- */
-static struct ast_frame *console_read(struct ast_channel *chan)
-{
-	ast_debug(1, "This should never be called, mmkay?!\n");
-
-	return &ast_null_frame;
+	char buf[1024];
+	struct ast_frame *fr, *ret_fr;
+	struct ast_frame f = {
+		.frametype = AST_FRAME_VOICE,
+		.subclass = AST_FORMAT_SLINEAR,
+		.src = "console_read",
+		.data = buf,
+	};
+
+	pfd.fd = pvt->fds[0];
+	pfd.events = POLLIN;
+	pfd.revents = 0;
+	while (poll(&pfd, 1, 0) == 1) {
+		int blah;
+		read(pvt->fds[0], &blah, sizeof(blah));
+	}
+
+	while ((res = jack_ringbuffer_read(pvt->input_rb, buf, sizeof(buf))) > 0) {
+		f.datalen = res;
+		f.samples = res / sizeof(int16_t);
+
+		ast_smoother_feed(pvt->smoother, &f);
+	}
+
+	/* Read one frame to return, and queue up any others on the channel just
+	 * in case we have run behind a bit. */
+
+	ret_fr = ast_smoother_read(pvt->smoother);
+
+	while ((fr = ast_smoother_read(pvt->smoother))) {
+		ast_queue_frame(chan, fr);
+		ast_frfree(fr);
+	}
+
+	return ret_fr ? ret_fr : &ast_null_frame;
 }
 
 static int console_call(struct ast_channel *c, char *dest, int timeout)
@@ -417,6 +405,8 @@
 
 	ast_verb(1, V_BEGIN "Call to device '%s' on console from '%s' <%s>" V_END,
 		dest, c->cid.cid_name, c->cid.cid_num);
+
+	ast_mutex_lock(&pvt->lock);
 
 	if (pvt->autoanswer) {
 		ast_verb(1, V_BEGIN "Auto-answered" V_END);
@@ -429,7 +419,9 @@
 		f.frametype = AST_FRAME_CONTROL;
 		f.subclass = AST_CONTROL_RINGING;
 	}
-	
+
+	ast_mutex_unlock(&pvt->lock);
+
 	ast_queue_frame(c, &f);
 
 	return start_stream(pvt);
@@ -439,14 +431,16 @@
 {
 	struct ast_frame *fr = ast_frdup(f);
 	struct console_pvt *pvt = &console_pvt;
+	int res;
 
 	if (!fr)
 		return -1;
 
-	ast_mutex_lock(&pvt->lock);
-	AST_LIST_INSERT_TAIL(&pvt->frames, fr, frame_list);
-	ast_cond_signal(&pvt->cond);
-	ast_mutex_unlock(&pvt->lock);
+	res = jack_ringbuffer_write(pvt->output_rb, fr->data, fr->datalen);
+	if (res != fr->datalen) {
+		/* ast_log(LOG_WARNING, "Failed to write %d bytes to the output ringbuffer\n",
+			fr->datalen); */
+	}
 
 	return 0;
 }
@@ -494,91 +488,42 @@
 
 static int handle_output(struct console_pvt *pvt, void *output, unsigned long frame_count)
 {
-	unsigned int src = 0;
-	unsigned int num_samples;
-	struct ast_frame *fr = NULL;
-	int res = paContinue;
-
-	ast_mutex_lock(&pvt->lock);
-
-	/* Empty the write buffer */
-	if (frame_count && pvt->write_dst) {
-		num_samples = MIN(pvt->write_dst * 2, frame_count);
-		memcpy(output, pvt->write_buf, num_samples * 2);
-		pvt->write_dst -= num_samples * 2;
-		src += num_samples * 2;
-		frame_count -= num_samples;
-	}
-
-	/* Get the rest from queued frames */
-	while (frame_count) { 
-		while (!(fr = AST_LIST_REMOVE_HEAD(&pvt->frames, frame_list))) {
-			if (!pvt->hookstate) {
-				memset(output + src, 0, frame_count * 2);
-				frame_count = 0;
-				break;
-			}
-			ast_cond_wait(&pvt->cond, &pvt->lock);
-			if (!pvt->hookstate) {
-				res = paAbort;
-				goto return_cleanup;
-			}
-		}
-		num_samples = MIN(fr->samples, frame_count);
-		memcpy(output + src, fr->data, num_samples * 2);
-		frame_count -= num_samples;
-		/* Write what is left over to the write buffer */
-		if (num_samples != fr->samples) {
-			memcpy(&pvt->write_buf + pvt->write_dst, fr->data + num_samples * 2, (fr->samples - num_samples) * 2);
-			pvt->write_dst += (fr->samples - num_samples) * 2;
-		}
-	}
-
-return_cleanup:
-	ast_mutex_unlock(&pvt->lock);
-	if (fr)
-		ast_frfree(fr);
-
-	return res;
+	int res;
+
+	res = jack_ringbuffer_read(pvt->output_rb, output, frame_count * 2);
+	if (res != frame_count * 2) {
+		char *out_buf = output;
+
+		/* ast_log(LOG_WARNING, "Wanted %ld bytes from the output ringbuffer, "
+			"but only got %d\n", frame_count * 2, res); */
+
+		out_buf += res;
+
+		/* zero out any samples that we didn't have data for */
+		memset(out_buf, 0, (frame_count * 2) - res);
+	}
+
+	return paContinue;
 }
 
 static int handle_input(struct console_pvt *pvt, const void *input, unsigned long frame_count)
 {
-	struct ast_frame *fr = &pvt->fr;
-	unsigned int count = 0;
-
-	memset(fr, 0, sizeof(*fr));
-	memset(pvt->read_buf, 0, AST_FRIENDLY_OFFSET);
-	fr->data = pvt->read_buf + AST_FRIENDLY_OFFSET;
-	fr->offset = AST_FRIENDLY_OFFSET;
-	fr->frametype = AST_FRAME_VOICE;
-	fr->subclass = AST_FORMAT_SLINEAR;
-	fr->samples = frame_count;
-	fr->datalen = frame_count * 2;
+	char buf[frame_count * 2];
+	int res;
+	int blah = 0;
 
 	if (pvt->muted) {
-		memcpy(fr->data, input, 
-			MIN(sizeof(pvt->read_buf) - AST_FRIENDLY_OFFSET, frame_count * 2) );
-	} else {
-		/* If muted, just leave the data all zeros */
-		memset(pvt->read_buf + AST_FRIENDLY_OFFSET, 0, 
-			sizeof(pvt->read_buf) - AST_FRIENDLY_OFFSET);
-	}
-
-	/* If the channel is in the process of being hung up, then it will be
-	 * locked while calling the console_hangup callback, and that could be
-	 * waiting to stop the audio stream, which will block until this function
-	 * returns.  Thus, locking the channel could result in a deadlock */
-	while (ast_channel_trylock(pvt->owner)) {
-		if (++count == 100) {
-			ast_debug(1, "Dropping frame\n");
-			pvt->incallback = 0;
-			return paContinue;	
-		}
-		usleep(10);
-	}
-	ast_queue_frame(pvt->owner, fr);
-	ast_channel_unlock(pvt->owner);
+		memset(buf, 0, sizeof(buf));
+		input = buf;
+	}
+	
+	res = jack_ringbuffer_write(pvt->input_rb, input, frame_count * 2);
+	if (res != frame_count * 2) {
+		/* ast_log(LOG_WARNING, "Wanted to write %ld bytes into the input ringbuffer, "
+			"but only wrote %d\n", frame_count * 2, res); */
+	}
+	
+	write(pvt->fds[1], &blah, sizeof(blah));
 
 	return paContinue;
 }
@@ -589,15 +534,11 @@
 {
 	int res = paContinue;
 	struct console_pvt *pvt = &console_pvt;
-
-	pvt->incallback = 1;
 
 	if (output)
 		res = handle_output(pvt, output, frame_count);
 	if (res != paAbort && input)
 		res = handle_input(pvt, input, frame_count);
-
-	pvt->incallback = 0;
 
 	return res;
 }
@@ -1032,22 +973,82 @@
 	return res;
 }
 
-static int unload_module(void)
-{
-	struct console_pvt *pvt = &console_pvt;
-
-	if (pvt->hookstate)
-		stop_stream(pvt);
-
-	Pa_Terminate();
-
-	ast_channel_unregister(&console_tech);
-	ast_cli_unregister_multiple(cli_console, sizeof(cli_console) / sizeof(cli_console[0]));
+static int init_pvt(struct console_pvt *pvt)
+{
+	if (ast_mutex_init(&pvt->lock)) {
+		ast_log(LOG_ERROR, "Failed to initialize mutex\n");
+		return -1;
+	}
+
+	if (!(pvt->smoother = ast_smoother_new(320))) { /* 20ms frames */
+		ast_log(LOG_ERROR, "Failed to create smoother\n");
+		return -1;
+	}
+
+	if (!(pvt->input_rb = jack_ringbuffer_create(RINGBUFFER_SIZE))) {
+		ast_log(LOG_ERROR, "Failed to create input ringbuffer of size %d\n",
+			RINGBUFFER_SIZE);
+		return -1;
+	}
+
+	if (!(pvt->output_rb = jack_ringbuffer_create(RINGBUFFER_SIZE))) {
+		ast_log(LOG_ERROR, "Failed to create output ringbuffer of size %d\n",
+			RINGBUFFER_SIZE);
+		return -1;
+	}
+
+	if (pipe(pvt->fds)) {
+		ast_log(LOG_ERROR, "Unable to create alert pipe: %s\n", strerror(errno));
+		return -1;
+	}
+
+	return 0;
+}
+
+static void destroy_pvt(struct console_pvt *pvt)
+{
+	int i;
 
 	ast_string_field_free_memory(pvt);
 	
 	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
+
+	if (pvt->smoother) {
+		ast_smoother_free(pvt->smoother);
+		pvt->smoother = NULL;
+	}
+
+	if (pvt->input_rb) {
+		jack_ringbuffer_free(pvt->input_rb);
+		pvt->input_rb = NULL;
+	}
+	
+	if (pvt->output_rb) {
+		jack_ringbuffer_free(pvt->output_rb);
+		pvt->output_rb = NULL;
+	}
+
+	for (i = 0; i < 2; i++) {
+		if (pvt->fds[i] > -1) {
+			close(pvt->fds[i]);
+			pvt->fds[i] = -1;
+		}
+	}
+}
+
+static int unload_module(void)
+{
+	struct console_pvt *pvt = &console_pvt;
+
+	if (pvt->hookstate)
+		stop_stream(pvt);
+
+	Pa_Terminate();
+
+	ast_channel_unregister(&console_tech);
+	ast_cli_unregister_multiple(cli_console, sizeof(cli_console) / sizeof(cli_console[0]));
+
+	destroy_pvt(pvt);
 
 	return 0;
 }
@@ -1057,8 +1058,8 @@
 	PaError res;
 	struct console_pvt *pvt = &console_pvt;
 
-	ast_mutex_init(&pvt->lock);
-	ast_cond_init(&pvt->cond, NULL);
+	if (init_pvt(pvt))
+		goto return_error;
 
 	if (load_config(0))
 		goto return_error;
@@ -1087,8 +1088,7 @@
 return_error_pa_init:
 	Pa_Terminate();
 return_error:
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
+	destroy_pvt(pvt);
 
 	return AST_MODULE_LOAD_DECLINE;
 }

Modified: team/russell/chan_console/main/frame.c
URL: http://svn.digium.com/view/asterisk/team/russell/chan_console/main/frame.c?view=diff&rev=95353&r1=95352&r2=95353
==============================================================================
--- team/russell/chan_console/main/frame.c (original)
+++ team/russell/chan_console/main/frame.c Sat Dec 29 00:10:50 2007
@@ -69,7 +69,7 @@
 };
 #endif
 
-#define SMOOTHER_SIZE 8000
+#define SMOOTHER_SIZE 32000
 
 enum frame_type {
 	TYPE_HIGH,     /* 0x0 */

Modified: team/russell/chan_console/utils/Makefile
URL: http://svn.digium.com/view/asterisk/team/russell/chan_console/utils/Makefile?view=diff&rev=95353&r1=95352&r2=95353
==============================================================================
--- team/russell/chan_console/utils/Makefile (original)
+++ team/russell/chan_console/utils/Makefile Sat Dec 29 00:10:50 2007
@@ -17,7 +17,7 @@
 .PHONY: clean all uninstall
 
 # to get check_expr, add it to the ALL_UTILS list
-ALL_UTILS:=astman smsq stereorize streamplayer aelparse muted check_expr conf2ael hashtest2 hashtest astcanary
+#ALL_UTILS:=astman smsq stereorize streamplayer aelparse muted check_expr conf2ael hashtest2 hashtest astcanary
 UTILS:=$(ALL_UTILS)
 
 LIBS += $(BKTR_LIB)	# astobj2 with devmode uses backtrace




More information about the svn-commits mailing list