[Asterisk-cvs] asterisk .cleancount,1.7,1.8 channel.c,1.252,1.253

kpfleming kpfleming
Fri Oct 28 19:08:38 CDT 2005


Update of /usr/cvsroot/asterisk
In directory mongoose.digium.com:/tmp/cvs-serv3067

Modified Files:
	.cleancount channel.c 
Log Message:
major redesign of the channel spy infrastructure, increasing efficiency and reducing locking conflicts
(nearly) complete rewrite of app_muxmon, renaming the application to MixMonitor and fixing a large number of bugs and inconsistencies
update app_chanspy to use new spy infrastructure


Index: .cleancount
===================================================================
RCS file: /usr/cvsroot/asterisk/.cleancount,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- .cleancount	30 Aug 2005 21:38:41 -0000	1.7
+++ .cleancount	28 Oct 2005 23:01:13 -0000	1.8
@@ -1 +1 @@
-7
+8

Index: channel.c
===================================================================
RCS file: /usr/cvsroot/asterisk/channel.c,v
retrieving revision 1.252
retrieving revision 1.253
diff -u -d -r1.252 -r1.253
--- channel.c	24 Oct 2005 20:12:04 -0000	1.252
+++ channel.c	28 Oct 2005 23:01:13 -0000	1.253
@@ -71,6 +71,17 @@
 #include "asterisk/transcap.h"
 #include "asterisk/devicestate.h"
 
+struct channel_spy_trans {
+	int last_format;
+	struct ast_trans_pvt *path;
+};
+
+struct ast_channel_spy_list {
+	struct channel_spy_trans read_translator;
+	struct channel_spy_trans write_translator;
+	AST_LIST_HEAD_NOLOCK(, ast_channel_spy) list;
+};
+
 /* uncomment if you have problems with 'monitoring' synchronized files */
 #if 0
 #define MONITOR_CONSTANT_DELAY
@@ -931,10 +942,8 @@
 	/* loop over the variables list, freeing all data and deleting list items */
 	/* no need to lock the list, as the channel is already locked */
 	
-	while (!AST_LIST_EMPTY(headp)) {           /* List Deletion. */
-	            vardata = AST_LIST_REMOVE_HEAD(headp, entries);
-	            ast_var_delete(vardata);
-	}
+	while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries)))
+		ast_var_delete(vardata);
 
 	free(chan);
 	ast_mutex_unlock(&chlock);
@@ -942,19 +951,134 @@
 	ast_device_state_changed_literal(name);
 }
 
-static void ast_spy_detach(struct ast_channel *chan) 
+int ast_channel_spy_add(struct ast_channel *chan, struct ast_channel_spy *spy)
 {
-	struct ast_channel_spy *chanspy;
+	if (!ast_test_flag(spy, CHANSPY_FORMAT_AUDIO)) {
+		ast_log(LOG_WARNING, "Could not add channel spy '%s' to channel '%s', only audio format spies are supported.\n",
+			spy->type, chan->name);
+		return -1;
+	}
 
-	/* Marking the spies as done is sufficient.  Chanspy or spy users will get the picture. */
-	for (chanspy = chan->spiers; chanspy; chanspy = chanspy->next) {
-		if (chanspy->status == CHANSPY_RUNNING) {
-			chanspy->status = CHANSPY_DONE;
+	if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST) && (spy->read_queue.format != AST_FORMAT_SLINEAR)) {
+		ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+			ast_getformatname(spy->read_queue.format));
+		return -1;
+	}
+
+	if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST) && (spy->write_queue.format != AST_FORMAT_SLINEAR)) {
+		ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+			ast_getformatname(spy->write_queue.format));
+		return -1;
+	}
+
+	if (ast_test_flag(spy, CHANSPY_MIXAUDIO) &&
+	    ((spy->read_queue.format != AST_FORMAT_SLINEAR) ||
+	     (spy->write_queue.format != AST_FORMAT_SLINEAR))) {
+		ast_log(LOG_WARNING, "Cannot provide audio mixing on '%s'-'%s' format spies\n",
+			ast_getformatname(spy->read_queue.format), ast_getformatname(spy->write_queue.format));
+		return -1;
+	}
+
+	if (!chan->spies) {
+		if (!(chan->spies = calloc(1, sizeof(*chan->spies)))) {
+			ast_log(LOG_WARNING, "Memory allocation failure\n");
+			return -1;
 		}
+
+		AST_LIST_HEAD_INIT_NOLOCK(&chan->spies->list);
+		AST_LIST_INSERT_HEAD(&chan->spies->list, spy, list);
+	} else {
+		AST_LIST_INSERT_TAIL(&chan->spies->list, spy, list);
 	}
 
-	chan->spiers = NULL;
-	return;
+	if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+		ast_cond_init(&spy->trigger, NULL);
+		ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+		ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+	}
+
+	ast_log(LOG_DEBUG, "Spy %s added to channel %s\n",
+		spy->type, chan->name);
+
+	return 0;
+}
+
+void ast_channel_spy_stop_by_type(struct ast_channel *chan, const char *type)
+{
+	struct ast_channel_spy *spy;
+	
+	if (!chan->spies)
+		return;
+
+	AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+		if ((spy->type == type) && (spy->status == CHANSPY_RUNNING))
+			spy->status = CHANSPY_DONE;
+	}
+}
+
+void ast_channel_spy_trigger_wait(struct ast_channel_spy *spy)
+{
+	ast_cond_wait(&spy->trigger, &spy->lock);
+}
+
+void ast_channel_spy_remove(struct ast_channel *chan, struct ast_channel_spy *spy)
+{
+	struct ast_frame *f;
+
+	if (!chan->spies)
+		return;
+
+	AST_LIST_REMOVE(&chan->spies->list, spy, list);
+
+	ast_mutex_lock(&spy->lock);
+
+	for (f = spy->read_queue.head; f; f = spy->read_queue.head) {
+		spy->read_queue.head = f->next;
+		ast_frfree(f);
+	}
+	for (f = spy->write_queue.head; f; f = spy->write_queue.head) {
+		spy->write_queue.head = f->next;
+		ast_frfree(f);
+	}
+
+	if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+		ast_cond_destroy(&spy->trigger);
+
+	ast_mutex_unlock(&spy->lock);
+
+	ast_log(LOG_DEBUG, "Spy %s removed from channel %s\n",
+		spy->type, chan->name);
+
+	if (AST_LIST_EMPTY(&chan->spies->list)) {
+		if (chan->spies->read_translator.path)
+			ast_translator_free_path(chan->spies->read_translator.path);
+		if (chan->spies->write_translator.path)
+			ast_translator_free_path(chan->spies->write_translator.path);
+		free(chan->spies);
+		chan->spies = NULL;
+	}
+}
+
+static void detach_spies(struct ast_channel *chan) 
+{
+	struct ast_channel_spy *spy;
+
+	if (!chan->spies)
+		return;
+
+	/* Marking the spies as done is sufficient.  Chanspy or spy users will get the picture. */
+	AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+		ast_mutex_lock(&spy->lock);
+		if (spy->status == CHANSPY_RUNNING)
+			spy->status = CHANSPY_DONE;
+		if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+			ast_cond_signal(&spy->trigger);
+		ast_mutex_unlock(&spy->lock);
+	}
+
+	AST_LIST_TRAVERSE_SAFE_BEGIN(&chan->spies->list, spy, list)
+		ast_channel_spy_remove(chan, spy);
+	AST_LIST_TRAVERSE_SAFE_END;
 }
 
 /*--- ast_softhangup_nolock: Softly hangup a channel, don't lock */
@@ -983,40 +1107,136 @@
 	return res;
 }
 
-static void ast_queue_spy_frame(struct ast_channel_spy *spy, struct ast_frame *f, int pos) 
+enum spy_direction {
+	SPY_READ,
+	SPY_WRITE,
+};
+
+#define SPY_QUEUE_SAMPLE_LIMIT 4000			/* half of one second */
+
+static void queue_frame_to_spies(struct ast_channel *chan, struct ast_frame *f, enum spy_direction dir)
 {
-	struct ast_frame *tmpf = NULL;
-	int count = 0;
+	struct ast_frame *translated_frame = NULL;
+	struct ast_channel_spy *spy;
+	struct ast_channel_spy_queue *queue;
+	struct ast_channel_spy_queue *other_queue;
+	struct channel_spy_trans *trans;
+	struct ast_frame *last;
 
-	ast_mutex_lock(&spy->lock);
-	for (tmpf=spy->queue[pos]; tmpf && tmpf->next; tmpf=tmpf->next) {
-		count++;
-	}
-	if (count > 1000) {
-		struct ast_frame *freef, *headf;
+	trans = (dir == SPY_READ) ? &chan->spies->read_translator : &chan->spies->write_translator;
 
-		ast_log(LOG_ERROR, "Too many frames queued at once, flushing cache.\n");
-		headf = spy->queue[pos];
-		/* deref the queue right away so it looks empty */
-		spy->queue[pos] = NULL;
-		tmpf = headf;
-		/* free the wasted frames */
-		while (tmpf) {
-			freef = tmpf;
-			tmpf = tmpf->next;
-			ast_frfree(freef);
+	AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+		ast_mutex_lock(&spy->lock);
+
+		queue = (dir == SPY_READ) ? &spy->read_queue : &spy->write_queue;
+
+		if ((queue->format == AST_FORMAT_SLINEAR) && (f->subclass != AST_FORMAT_SLINEAR)) {
+			if (!translated_frame) {
+				if (trans->path && (trans->last_format != f->subclass)) {
+					ast_translator_free_path(trans->path);
+					trans->path = NULL;
+				}
+				if (!trans->path) {
+					ast_log(LOG_DEBUG, "Building translator from %s to SLINEAR for spies on channel %s\n",
+						ast_getformatname(f->subclass), chan->name);
+					if ((trans->path = ast_translator_build_path(AST_FORMAT_SLINEAR, f->subclass)) == NULL) {
+						ast_log(LOG_WARNING, "Cannot build a path from %s to %s\n",
+							ast_getformatname(f->subclass), ast_getformatname(AST_FORMAT_SLINEAR));
+						ast_mutex_unlock(&spy->lock);
+						continue;
+					} else {
+						trans->last_format = f->subclass;
+					}
+				}
+				translated_frame = ast_translate(trans->path, f, 0);
+			}
+
+			for (last = queue->head; last && last->next; last = last->next);
+			if (last)
+				last->next = ast_frdup(translated_frame);
+			else
+				queue->head = ast_frdup(translated_frame);
+		} else {
+			if (f->subclass != queue->format) {
+				ast_log(LOG_WARNING, "Spy '%s' on channel '%s' wants format '%s', but frame is '%s', dropping\n",
+					spy->type, chan->name,
+					ast_getformatname(queue->format), ast_getformatname(f->subclass));
+				ast_mutex_unlock(&spy->lock);
+				continue;
+			}
+
+			for (last = queue->head; last && last->next; last = last->next);
+			if (last)
+				last->next = ast_frdup(f);
+			else
+				queue->head = ast_frdup(f);
 		}
-		ast_mutex_unlock(&spy->lock);
-		return;
-	}
 
-	if (tmpf) {
-		tmpf->next = ast_frdup(f);
-	} else {
-		spy->queue[pos] = ast_frdup(f);
+		queue->samples += f->samples;
+
+		if (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+			if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+				other_queue = (dir == SPY_WRITE) ? &spy->read_queue : &spy->write_queue;
+
+				if (other_queue->samples == 0) {
+					switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+					case CHANSPY_TRIGGER_READ:
+						if (dir == SPY_WRITE) {
+							ast_set_flag(spy, CHANSPY_TRIGGER_WRITE);
+							ast_clear_flag(spy, CHANSPY_TRIGGER_READ);
+							if (option_debug)
+								ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to write-trigger mode\n",
+									spy->type, chan->name);
+						}
+						break;
+					case CHANSPY_TRIGGER_WRITE:
+						if (dir == SPY_READ) {
+							ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+							ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+							if (option_debug)
+								ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to read-trigger mode\n",
+									spy->type, chan->name);
+						}
+						break;
+					}
+					if (option_debug)
+						ast_log(LOG_DEBUG, "Triggering queue flush for spy '%s' on '%s'\n",
+							spy->type, chan->name);
+					ast_set_flag(spy, CHANSPY_TRIGGER_FLUSH);
+					ast_cond_signal(&spy->trigger);
+					ast_mutex_unlock(&spy->lock);
+					continue;
+				}
+			}
+
+			if (option_debug)
+				ast_log(LOG_DEBUG, "Spy '%s' on channel '%s' %s queue too long, dropping frames\n",
+					spy->type, chan->name, (dir == SPY_READ) ? "read" : "write");
+			while (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+				struct ast_frame *drop = queue->head;
+
+				queue->samples -= drop->samples;
+				queue->head = drop->next;
+				ast_frfree(drop);
+			}
+		} else {
+			switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+			case CHANSPY_TRIGGER_READ:
+				if (dir == SPY_READ)
+					ast_cond_signal(&spy->trigger);
+				break;
+			case CHANSPY_TRIGGER_WRITE:
+				if (dir == SPY_WRITE)
+					ast_cond_signal(&spy->trigger);
+				break;
+			}
+		}
+
+		ast_mutex_unlock(&spy->lock);
 	}
 
-	ast_mutex_unlock(&spy->lock);
+	if (translated_frame)
+		ast_frfree(translated_frame);
 }
 
 static void free_translation(struct ast_channel *clone)
@@ -1040,7 +1260,7 @@
 	   if someone is going to masquerade as us */
 	ast_mutex_lock(&chan->lock);
 
-	ast_spy_detach(chan);		/* get rid of spies */
+	detach_spies(chan);		/* get rid of spies */
 
 	if (chan->masq) {
 		if (ast_do_masquerade(chan)) 
@@ -1174,20 +1394,28 @@
 int ast_activate_generator(struct ast_channel *chan, struct ast_generator *gen, void *params)
 {
 	int res = 0;
+
 	ast_mutex_lock(&chan->lock);
+
 	if (chan->generatordata) {
 		if (chan->generator && chan->generator->release)
 			chan->generator->release(chan, chan->generatordata);
 		chan->generatordata = NULL;
 	}
+
 	ast_prod(chan);
-	if ((chan->generatordata = gen->alloc(chan, params))) {
+	if (gen->alloc) {
+		if (!(chan->generatordata = gen->alloc(chan, params)))
+			res = -1;
+	}
+	
+	if (!res) {
 		ast_settimeout(chan, 160, generator_force, chan);
 		chan->generator = gen;
-	} else {
-		res = -1;
 	}
+
 	ast_mutex_unlock(&chan->lock);
+
 	return res;
 }
 
@@ -1661,12 +1889,9 @@
 			ast_frfree(f);
 			f = &null_frame;
 		} else {
-			if (chan->spiers) {
-				struct ast_channel_spy *spying;
-				for (spying = chan->spiers; spying; spying=spying->next) {
-					ast_queue_spy_frame(spying, f, 0);
-				}
-			}
+			if (chan->spies)
+				queue_frame_to_spies(chan, f, SPY_READ);
+
 			if (chan->monitor && chan->monitor->read_stream ) {
 #ifndef MONITOR_CONSTANT_DELAY
 				int jump = chan->outsmpl - chan->insmpl - 2 * f->samples;
@@ -2007,17 +2232,10 @@
 		break;
 	default:
 		if (chan->tech->write) {
-			if (chan->writetrans) 
-				f = ast_translate(chan->writetrans, fr, 0);
-			else
-				f = fr;
+			f = (chan->writetrans) ? ast_translate(chan->writetrans, fr, 0) : fr;
 			if (f) {
-				if (f->frametype == AST_FRAME_VOICE && chan->spiers) {
-					struct ast_channel_spy *spying;
-					for (spying = chan->spiers; spying; spying=spying->next) {
-						ast_queue_spy_frame(spying, f, 1);
-					}
-				}
+				if (f->frametype == AST_FRAME_VOICE && chan->spies)
+					queue_frame_to_spies(chan, f, SPY_WRITE);
 
 				if( chan->monitor && chan->monitor->write_stream &&
 						f && ( f->frametype == AST_FRAME_VOICE ) ) {
@@ -3207,8 +3425,9 @@
 		if (c0->tech->bridge &&
 		    (config->timelimit == 0) &&
 		    (c0->tech->bridge == c1->tech->bridge) &&
-		    !nativefailed && !c0->monitor && !c1->monitor && !c0->spiers && !c1->spiers) {
-			/* Looks like they share a bridge method */
+		    !nativefailed && !c0->monitor && !c1->monitor &&
+		    !c0->spies && !c1->spies) {
+			/* Looks like they share a bridge method and nothing else is in the way */
 			if (option_verbose > 2) 
 				ast_verbose(VERBOSE_PREFIX_3 "Attempting native bridge of %s and %s\n", c0->name, c1->name);
 			ast_set_flag(c0, AST_FLAG_NBRIDGE);
@@ -3237,6 +3456,7 @@
 			} else {
 				ast_clear_flag(c0, AST_FLAG_NBRIDGE);
 				ast_clear_flag(c1, AST_FLAG_NBRIDGE);
+				ast_verbose(VERBOSE_PREFIX_3 "Native bridge of %s and %s was unsuccessful\n", c0->name, c1->name);
 			}
 			if (res == AST_BRIDGE_RETRY)
 				continue;
@@ -3570,3 +3790,134 @@
 	for (cur = vars; cur; cur = cur->next)
 		pbx_builtin_setvar_helper(chan, cur->name, cur->value);	
 }
+
+static void copy_data_from_queue(struct ast_channel_spy_queue *queue, short *buf, unsigned int samples)
+{
+	struct ast_frame *f;
+	int tocopy;
+	int bytestocopy;
+
+	while (samples) {
+		f = queue->head;
+
+		if (!f) {
+			ast_log(LOG_ERROR, "Ran out of frames before buffer filled!\n");
+			break;
+		}
+
+		tocopy = (f->samples > samples) ? samples : f->samples;
+		bytestocopy = ast_codec_get_len(queue->format, samples);
+		memcpy(buf, f->data, bytestocopy);
+		samples -= tocopy;
+		buf += tocopy;
+		f->samples -= tocopy;
+		f->data += bytestocopy;
+		f->datalen -= bytestocopy;
+		f->offset += bytestocopy;
+		queue->samples -= tocopy;
+		if (!f->samples) {
+			queue->head = f->next;
+			ast_frfree(f);
+		}
+	}
+}
+
+struct ast_frame *ast_channel_spy_read_frame(struct ast_channel_spy *spy, unsigned int samples)
+{
+	struct ast_frame *result;
+	/* buffers are allocated to hold SLINEAR, which is the largest format */
+        short read_buf[samples];
+        short write_buf[samples];
+	struct ast_frame *read_frame;
+	struct ast_frame *write_frame;
+	int need_dup;
+	struct ast_frame stack_read_frame = { .frametype = AST_FRAME_VOICE,
+					      .subclass = spy->read_queue.format,
+					      .data = read_buf,
+					      .samples = samples,
+					      .datalen = ast_codec_get_len(spy->read_queue.format, samples),
+	};
+	struct ast_frame stack_write_frame = { .frametype = AST_FRAME_VOICE,
+					       .subclass = spy->write_queue.format,
+					       .data = write_buf,
+					       .samples = samples,
+					       .datalen = ast_codec_get_len(spy->write_queue.format, samples),
+	};
+
+	/* if a flush has been requested, dump everything in whichever queue is larger */
+	if (ast_test_flag(spy, CHANSPY_TRIGGER_FLUSH)) {
+		if (spy->read_queue.samples > spy->write_queue.samples) {
+			if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST)) {
+				for (result = spy->read_queue.head; result; result = result->next)
+					ast_frame_adjust_volume(result, spy->read_vol_adjustment);
+			}
+			result = spy->read_queue.head;
+			spy->read_queue.head = NULL;
+			spy->read_queue.samples = 0;
+			ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+			return result;
+		} else {
+			if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST)) {
+				for (result = spy->write_queue.head; result; result = result->next)
+					ast_frame_adjust_volume(result, spy->write_vol_adjustment);
+			}
+			result = spy->write_queue.head;
+			spy->write_queue.head = NULL;
+			spy->write_queue.samples = 0;
+			ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+			return result;
+		}
+	}
+
+	if ((spy->read_queue.samples < samples) || (spy->write_queue.samples < samples))
+		return NULL;
+
+	/* short-circuit if both head frames have exactly what we want */
+	if ((spy->read_queue.head->samples == samples) &&
+	    (spy->write_queue.head->samples == samples)) {
+		read_frame = spy->read_queue.head;
+		spy->read_queue.head = read_frame->next;
+		read_frame->next = NULL;
+
+		write_frame = spy->write_queue.head;
+		spy->write_queue.head = write_frame->next;
+		write_frame->next = NULL;
+
+		spy->read_queue.samples -= samples;
+		spy->write_queue.samples -= samples;
+
+		need_dup = 0;
+	} else {
+		copy_data_from_queue(&spy->read_queue, read_buf, samples);
+		copy_data_from_queue(&spy->write_queue, write_buf, samples);
+
+		read_frame = &stack_read_frame;
+		write_frame = &stack_write_frame;
+		need_dup = 1;
+	}
+	
+	if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST))
+		ast_frame_adjust_volume(read_frame, spy->read_vol_adjustment);
+
+	if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST))
+		ast_frame_adjust_volume(write_frame, spy->write_vol_adjustment);
+
+	if (ast_test_flag(spy, CHANSPY_MIXAUDIO)) {
+		ast_frame_slinear_sum(read_frame, write_frame);
+
+		if (need_dup)
+			result = ast_frdup(read_frame);
+		else
+			result = read_frame;
+	} else {
+		if (need_dup) {
+			result = ast_frdup(read_frame);
+			result->next = ast_frdup(write_frame);
+		} else {
+			result = read_frame;
+			result->next = write_frame;
+		}
+	}
+
+	return result;
+}




More information about the svn-commits mailing list