[Asterisk-cvs] asterisk .cleancount,1.7,1.8 channel.c,1.252,1.253
kpfleming
kpfleming
Fri Oct 28 19:08:38 CDT 2005
Update of /usr/cvsroot/asterisk
In directory mongoose.digium.com:/tmp/cvs-serv3067
Modified Files:
.cleancount channel.c
Log Message:
major redesign of the channel spy infrastructure, increasing efficiency and reducing locking conflicts
(nearly) complete rewrite of app_muxmon, renaming the application to MixMonitor and fixing a large number of bugs and inconsistencies
update app_chanspy to use new spy infrastructure
Index: .cleancount
===================================================================
RCS file: /usr/cvsroot/asterisk/.cleancount,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- .cleancount 30 Aug 2005 21:38:41 -0000 1.7
+++ .cleancount 28 Oct 2005 23:01:13 -0000 1.8
@@ -1 +1 @@
-7
+8
Index: channel.c
===================================================================
RCS file: /usr/cvsroot/asterisk/channel.c,v
retrieving revision 1.252
retrieving revision 1.253
diff -u -d -r1.252 -r1.253
--- channel.c 24 Oct 2005 20:12:04 -0000 1.252
+++ channel.c 28 Oct 2005 23:01:13 -0000 1.253
@@ -71,6 +71,17 @@
#include "asterisk/transcap.h"
#include "asterisk/devicestate.h"
+struct channel_spy_trans {
+ int last_format;
+ struct ast_trans_pvt *path;
+};
+
+struct ast_channel_spy_list {
+ struct channel_spy_trans read_translator;
+ struct channel_spy_trans write_translator;
+ AST_LIST_HEAD_NOLOCK(, ast_channel_spy) list;
+};
+
/* uncomment if you have problems with 'monitoring' synchronized files */
#if 0
#define MONITOR_CONSTANT_DELAY
@@ -931,10 +942,8 @@
/* loop over the variables list, freeing all data and deleting list items */
/* no need to lock the list, as the channel is already locked */
- while (!AST_LIST_EMPTY(headp)) { /* List Deletion. */
- vardata = AST_LIST_REMOVE_HEAD(headp, entries);
- ast_var_delete(vardata);
- }
+ while ((vardata = AST_LIST_REMOVE_HEAD(headp, entries)))
+ ast_var_delete(vardata);
free(chan);
ast_mutex_unlock(&chlock);
@@ -942,19 +951,134 @@
ast_device_state_changed_literal(name);
}
-static void ast_spy_detach(struct ast_channel *chan)
+int ast_channel_spy_add(struct ast_channel *chan, struct ast_channel_spy *spy)
{
- struct ast_channel_spy *chanspy;
+ if (!ast_test_flag(spy, CHANSPY_FORMAT_AUDIO)) {
+ ast_log(LOG_WARNING, "Could not add channel spy '%s' to channel '%s', only audio format spies are supported.\n",
+ spy->type, chan->name);
+ return -1;
+ }
- /* Marking the spies as done is sufficient. Chanspy or spy users will get the picture. */
- for (chanspy = chan->spiers; chanspy; chanspy = chanspy->next) {
- if (chanspy->status == CHANSPY_RUNNING) {
- chanspy->status = CHANSPY_DONE;
+ if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST) && (spy->read_queue.format != AST_FORMAT_SLINEAR)) {
+ ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+ ast_getformatname(spy->read_queue.format));
+ return -1;
+ }
+
+ if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST) && (spy->write_queue.format != AST_FORMAT_SLINEAR)) {
+ ast_log(LOG_WARNING, "Cannot provide volume adjustment on '%s' format spies\n",
+ ast_getformatname(spy->write_queue.format));
+ return -1;
+ }
+
+ if (ast_test_flag(spy, CHANSPY_MIXAUDIO) &&
+ ((spy->read_queue.format != AST_FORMAT_SLINEAR) ||
+ (spy->write_queue.format != AST_FORMAT_SLINEAR))) {
+ ast_log(LOG_WARNING, "Cannot provide audio mixing on '%s'-'%s' format spies\n",
+ ast_getformatname(spy->read_queue.format), ast_getformatname(spy->write_queue.format));
+ return -1;
+ }
+
+ if (!chan->spies) {
+ if (!(chan->spies = calloc(1, sizeof(*chan->spies)))) {
+ ast_log(LOG_WARNING, "Memory allocation failure\n");
+ return -1;
}
+
+ AST_LIST_HEAD_INIT_NOLOCK(&chan->spies->list);
+ AST_LIST_INSERT_HEAD(&chan->spies->list, spy, list);
+ } else {
+ AST_LIST_INSERT_TAIL(&chan->spies->list, spy, list);
}
- chan->spiers = NULL;
- return;
+ if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+ ast_cond_init(&spy->trigger, NULL);
+ ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+ ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+ }
+
+ ast_log(LOG_DEBUG, "Spy %s added to channel %s\n",
+ spy->type, chan->name);
+
+ return 0;
+}
+
+void ast_channel_spy_stop_by_type(struct ast_channel *chan, const char *type)
+{
+ struct ast_channel_spy *spy;
+
+ if (!chan->spies)
+ return;
+
+ AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+ if ((spy->type == type) && (spy->status == CHANSPY_RUNNING))
+ spy->status = CHANSPY_DONE;
+ }
+}
+
+void ast_channel_spy_trigger_wait(struct ast_channel_spy *spy)
+{
+ ast_cond_wait(&spy->trigger, &spy->lock);
+}
+
+void ast_channel_spy_remove(struct ast_channel *chan, struct ast_channel_spy *spy)
+{
+ struct ast_frame *f;
+
+ if (!chan->spies)
+ return;
+
+ AST_LIST_REMOVE(&chan->spies->list, spy, list);
+
+ ast_mutex_lock(&spy->lock);
+
+ for (f = spy->read_queue.head; f; f = spy->read_queue.head) {
+ spy->read_queue.head = f->next;
+ ast_frfree(f);
+ }
+ for (f = spy->write_queue.head; f; f = spy->write_queue.head) {
+ spy->write_queue.head = f->next;
+ ast_frfree(f);
+ }
+
+ if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+ ast_cond_destroy(&spy->trigger);
+
+ ast_mutex_unlock(&spy->lock);
+
+ ast_log(LOG_DEBUG, "Spy %s removed from channel %s\n",
+ spy->type, chan->name);
+
+ if (AST_LIST_EMPTY(&chan->spies->list)) {
+ if (chan->spies->read_translator.path)
+ ast_translator_free_path(chan->spies->read_translator.path);
+ if (chan->spies->write_translator.path)
+ ast_translator_free_path(chan->spies->write_translator.path);
+ free(chan->spies);
+ chan->spies = NULL;
+ }
+}
+
+static void detach_spies(struct ast_channel *chan)
+{
+ struct ast_channel_spy *spy;
+
+ if (!chan->spies)
+ return;
+
+ /* Marking the spies as done is sufficient. Chanspy or spy users will get the picture. */
+ AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+ ast_mutex_lock(&spy->lock);
+ if (spy->status == CHANSPY_RUNNING)
+ spy->status = CHANSPY_DONE;
+ if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE)
+ ast_cond_signal(&spy->trigger);
+ ast_mutex_unlock(&spy->lock);
+ }
+
+ AST_LIST_TRAVERSE_SAFE_BEGIN(&chan->spies->list, spy, list)
+ ast_channel_spy_remove(chan, spy);
+ AST_LIST_TRAVERSE_SAFE_END;
}
/*--- ast_softhangup_nolock: Softly hangup a channel, don't lock */
@@ -983,40 +1107,136 @@
return res;
}
-static void ast_queue_spy_frame(struct ast_channel_spy *spy, struct ast_frame *f, int pos)
+enum spy_direction {
+ SPY_READ,
+ SPY_WRITE,
+};
+
+#define SPY_QUEUE_SAMPLE_LIMIT 4000 /* half of one second */
+
+static void queue_frame_to_spies(struct ast_channel *chan, struct ast_frame *f, enum spy_direction dir)
{
- struct ast_frame *tmpf = NULL;
- int count = 0;
+ struct ast_frame *translated_frame = NULL;
+ struct ast_channel_spy *spy;
+ struct ast_channel_spy_queue *queue;
+ struct ast_channel_spy_queue *other_queue;
+ struct channel_spy_trans *trans;
+ struct ast_frame *last;
- ast_mutex_lock(&spy->lock);
- for (tmpf=spy->queue[pos]; tmpf && tmpf->next; tmpf=tmpf->next) {
- count++;
- }
- if (count > 1000) {
- struct ast_frame *freef, *headf;
+ trans = (dir == SPY_READ) ? &chan->spies->read_translator : &chan->spies->write_translator;
- ast_log(LOG_ERROR, "Too many frames queued at once, flushing cache.\n");
- headf = spy->queue[pos];
- /* deref the queue right away so it looks empty */
- spy->queue[pos] = NULL;
- tmpf = headf;
- /* free the wasted frames */
- while (tmpf) {
- freef = tmpf;
- tmpf = tmpf->next;
- ast_frfree(freef);
+ AST_LIST_TRAVERSE(&chan->spies->list, spy, list) {
+ ast_mutex_lock(&spy->lock);
+
+ queue = (dir == SPY_READ) ? &spy->read_queue : &spy->write_queue;
+
+ if ((queue->format == AST_FORMAT_SLINEAR) && (f->subclass != AST_FORMAT_SLINEAR)) {
+ if (!translated_frame) {
+ if (trans->path && (trans->last_format != f->subclass)) {
+ ast_translator_free_path(trans->path);
+ trans->path = NULL;
+ }
+ if (!trans->path) {
+ ast_log(LOG_DEBUG, "Building translator from %s to SLINEAR for spies on channel %s\n",
+ ast_getformatname(f->subclass), chan->name);
+ if ((trans->path = ast_translator_build_path(AST_FORMAT_SLINEAR, f->subclass)) == NULL) {
+ ast_log(LOG_WARNING, "Cannot build a path from %s to %s\n",
+ ast_getformatname(f->subclass), ast_getformatname(AST_FORMAT_SLINEAR));
+ ast_mutex_unlock(&spy->lock);
+ continue;
+ } else {
+ trans->last_format = f->subclass;
+ }
+ }
+ translated_frame = ast_translate(trans->path, f, 0);
+ }
+
+ for (last = queue->head; last && last->next; last = last->next);
+ if (last)
+ last->next = ast_frdup(translated_frame);
+ else
+ queue->head = ast_frdup(translated_frame);
+ } else {
+ if (f->subclass != queue->format) {
+ ast_log(LOG_WARNING, "Spy '%s' on channel '%s' wants format '%s', but frame is '%s', dropping\n",
+ spy->type, chan->name,
+ ast_getformatname(queue->format), ast_getformatname(f->subclass));
+ ast_mutex_unlock(&spy->lock);
+ continue;
+ }
+
+ for (last = queue->head; last && last->next; last = last->next);
+ if (last)
+ last->next = ast_frdup(f);
+ else
+ queue->head = ast_frdup(f);
}
- ast_mutex_unlock(&spy->lock);
- return;
- }
- if (tmpf) {
- tmpf->next = ast_frdup(f);
- } else {
- spy->queue[pos] = ast_frdup(f);
+ queue->samples += f->samples;
+
+ if (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+ if (ast_test_flag(spy, CHANSPY_TRIGGER_MODE) != CHANSPY_TRIGGER_NONE) {
+ other_queue = (dir == SPY_WRITE) ? &spy->read_queue : &spy->write_queue;
+
+ if (other_queue->samples == 0) {
+ switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+ case CHANSPY_TRIGGER_READ:
+ if (dir == SPY_WRITE) {
+ ast_set_flag(spy, CHANSPY_TRIGGER_WRITE);
+ ast_clear_flag(spy, CHANSPY_TRIGGER_READ);
+ if (option_debug)
+ ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to write-trigger mode\n",
+ spy->type, chan->name);
+ }
+ break;
+ case CHANSPY_TRIGGER_WRITE:
+ if (dir == SPY_READ) {
+ ast_set_flag(spy, CHANSPY_TRIGGER_READ);
+ ast_clear_flag(spy, CHANSPY_TRIGGER_WRITE);
+ if (option_debug)
+ ast_log(LOG_DEBUG, "Switching spy '%s' on '%s' to read-trigger mode\n",
+ spy->type, chan->name);
+ }
+ break;
+ }
+ if (option_debug)
+ ast_log(LOG_DEBUG, "Triggering queue flush for spy '%s' on '%s'\n",
+ spy->type, chan->name);
+ ast_set_flag(spy, CHANSPY_TRIGGER_FLUSH);
+ ast_cond_signal(&spy->trigger);
+ ast_mutex_unlock(&spy->lock);
+ continue;
+ }
+ }
+
+ if (option_debug)
+ ast_log(LOG_DEBUG, "Spy '%s' on channel '%s' %s queue too long, dropping frames\n",
+ spy->type, chan->name, (dir == SPY_READ) ? "read" : "write");
+ while (queue->samples > SPY_QUEUE_SAMPLE_LIMIT) {
+ struct ast_frame *drop = queue->head;
+
+ queue->samples -= drop->samples;
+ queue->head = drop->next;
+ ast_frfree(drop);
+ }
+ } else {
+ switch (ast_test_flag(spy, CHANSPY_TRIGGER_MODE)) {
+ case CHANSPY_TRIGGER_READ:
+ if (dir == SPY_READ)
+ ast_cond_signal(&spy->trigger);
+ break;
+ case CHANSPY_TRIGGER_WRITE:
+ if (dir == SPY_WRITE)
+ ast_cond_signal(&spy->trigger);
+ break;
+ }
+ }
+
+ ast_mutex_unlock(&spy->lock);
}
- ast_mutex_unlock(&spy->lock);
+ if (translated_frame)
+ ast_frfree(translated_frame);
}
static void free_translation(struct ast_channel *clone)
@@ -1040,7 +1260,7 @@
if someone is going to masquerade as us */
ast_mutex_lock(&chan->lock);
- ast_spy_detach(chan); /* get rid of spies */
+ detach_spies(chan); /* get rid of spies */
if (chan->masq) {
if (ast_do_masquerade(chan))
@@ -1174,20 +1394,28 @@
int ast_activate_generator(struct ast_channel *chan, struct ast_generator *gen, void *params)
{
int res = 0;
+
ast_mutex_lock(&chan->lock);
+
if (chan->generatordata) {
if (chan->generator && chan->generator->release)
chan->generator->release(chan, chan->generatordata);
chan->generatordata = NULL;
}
+
ast_prod(chan);
- if ((chan->generatordata = gen->alloc(chan, params))) {
+ if (gen->alloc) {
+ if (!(chan->generatordata = gen->alloc(chan, params)))
+ res = -1;
+ }
+
+ if (!res) {
ast_settimeout(chan, 160, generator_force, chan);
chan->generator = gen;
- } else {
- res = -1;
}
+
ast_mutex_unlock(&chan->lock);
+
return res;
}
@@ -1661,12 +1889,9 @@
ast_frfree(f);
f = &null_frame;
} else {
- if (chan->spiers) {
- struct ast_channel_spy *spying;
- for (spying = chan->spiers; spying; spying=spying->next) {
- ast_queue_spy_frame(spying, f, 0);
- }
- }
+ if (chan->spies)
+ queue_frame_to_spies(chan, f, SPY_READ);
+
if (chan->monitor && chan->monitor->read_stream ) {
#ifndef MONITOR_CONSTANT_DELAY
int jump = chan->outsmpl - chan->insmpl - 2 * f->samples;
@@ -2007,17 +2232,10 @@
break;
default:
if (chan->tech->write) {
- if (chan->writetrans)
- f = ast_translate(chan->writetrans, fr, 0);
- else
- f = fr;
+ f = (chan->writetrans) ? ast_translate(chan->writetrans, fr, 0) : fr;
if (f) {
- if (f->frametype == AST_FRAME_VOICE && chan->spiers) {
- struct ast_channel_spy *spying;
- for (spying = chan->spiers; spying; spying=spying->next) {
- ast_queue_spy_frame(spying, f, 1);
- }
- }
+ if (f->frametype == AST_FRAME_VOICE && chan->spies)
+ queue_frame_to_spies(chan, f, SPY_WRITE);
if( chan->monitor && chan->monitor->write_stream &&
f && ( f->frametype == AST_FRAME_VOICE ) ) {
@@ -3207,8 +3425,9 @@
if (c0->tech->bridge &&
(config->timelimit == 0) &&
(c0->tech->bridge == c1->tech->bridge) &&
- !nativefailed && !c0->monitor && !c1->monitor && !c0->spiers && !c1->spiers) {
- /* Looks like they share a bridge method */
+ !nativefailed && !c0->monitor && !c1->monitor &&
+ !c0->spies && !c1->spies) {
+ /* Looks like they share a bridge method and nothing else is in the way */
if (option_verbose > 2)
ast_verbose(VERBOSE_PREFIX_3 "Attempting native bridge of %s and %s\n", c0->name, c1->name);
ast_set_flag(c0, AST_FLAG_NBRIDGE);
@@ -3237,6 +3456,7 @@
} else {
ast_clear_flag(c0, AST_FLAG_NBRIDGE);
ast_clear_flag(c1, AST_FLAG_NBRIDGE);
+ ast_verbose(VERBOSE_PREFIX_3 "Native bridge of %s and %s was unsuccessful\n", c0->name, c1->name);
}
if (res == AST_BRIDGE_RETRY)
continue;
@@ -3570,3 +3790,134 @@
for (cur = vars; cur; cur = cur->next)
pbx_builtin_setvar_helper(chan, cur->name, cur->value);
}
+
+static void copy_data_from_queue(struct ast_channel_spy_queue *queue, short *buf, unsigned int samples)
+{
+ struct ast_frame *f;
+ int tocopy;
+ int bytestocopy;
+
+ while (samples) {
+ f = queue->head;
+
+ if (!f) {
+ ast_log(LOG_ERROR, "Ran out of frames before buffer filled!\n");
+ break;
+ }
+
+ tocopy = (f->samples > samples) ? samples : f->samples;
+ bytestocopy = ast_codec_get_len(queue->format, samples);
+ memcpy(buf, f->data, bytestocopy);
+ samples -= tocopy;
+ buf += tocopy;
+ f->samples -= tocopy;
+ f->data += bytestocopy;
+ f->datalen -= bytestocopy;
+ f->offset += bytestocopy;
+ queue->samples -= tocopy;
+ if (!f->samples) {
+ queue->head = f->next;
+ ast_frfree(f);
+ }
+ }
+}
+
+struct ast_frame *ast_channel_spy_read_frame(struct ast_channel_spy *spy, unsigned int samples)
+{
+ struct ast_frame *result;
+ /* buffers are allocated to hold SLINEAR, which is the largest format */
+ short read_buf[samples];
+ short write_buf[samples];
+ struct ast_frame *read_frame;
+ struct ast_frame *write_frame;
+ int need_dup;
+ struct ast_frame stack_read_frame = { .frametype = AST_FRAME_VOICE,
+ .subclass = spy->read_queue.format,
+ .data = read_buf,
+ .samples = samples,
+ .datalen = ast_codec_get_len(spy->read_queue.format, samples),
+ };
+ struct ast_frame stack_write_frame = { .frametype = AST_FRAME_VOICE,
+ .subclass = spy->write_queue.format,
+ .data = write_buf,
+ .samples = samples,
+ .datalen = ast_codec_get_len(spy->write_queue.format, samples),
+ };
+
+ /* if a flush has been requested, dump everything in whichever queue is larger */
+ if (ast_test_flag(spy, CHANSPY_TRIGGER_FLUSH)) {
+ if (spy->read_queue.samples > spy->write_queue.samples) {
+ if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST)) {
+ for (result = spy->read_queue.head; result; result = result->next)
+ ast_frame_adjust_volume(result, spy->read_vol_adjustment);
+ }
+ result = spy->read_queue.head;
+ spy->read_queue.head = NULL;
+ spy->read_queue.samples = 0;
+ ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+ return result;
+ } else {
+ if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST)) {
+ for (result = spy->write_queue.head; result; result = result->next)
+ ast_frame_adjust_volume(result, spy->write_vol_adjustment);
+ }
+ result = spy->write_queue.head;
+ spy->write_queue.head = NULL;
+ spy->write_queue.samples = 0;
+ ast_clear_flag(spy, CHANSPY_TRIGGER_FLUSH);
+ return result;
+ }
+ }
+
+ if ((spy->read_queue.samples < samples) || (spy->write_queue.samples < samples))
+ return NULL;
+
+ /* short-circuit if both head frames have exactly what we want */
+ if ((spy->read_queue.head->samples == samples) &&
+ (spy->write_queue.head->samples == samples)) {
+ read_frame = spy->read_queue.head;
+ spy->read_queue.head = read_frame->next;
+ read_frame->next = NULL;
+
+ write_frame = spy->write_queue.head;
+ spy->write_queue.head = write_frame->next;
+ write_frame->next = NULL;
+
+ spy->read_queue.samples -= samples;
+ spy->write_queue.samples -= samples;
+
+ need_dup = 0;
+ } else {
+ copy_data_from_queue(&spy->read_queue, read_buf, samples);
+ copy_data_from_queue(&spy->write_queue, write_buf, samples);
+
+ read_frame = &stack_read_frame;
+ write_frame = &stack_write_frame;
+ need_dup = 1;
+ }
+
+ if (ast_test_flag(spy, CHANSPY_READ_VOLADJUST))
+ ast_frame_adjust_volume(read_frame, spy->read_vol_adjustment);
+
+ if (ast_test_flag(spy, CHANSPY_WRITE_VOLADJUST))
+ ast_frame_adjust_volume(write_frame, spy->write_vol_adjustment);
+
+ if (ast_test_flag(spy, CHANSPY_MIXAUDIO)) {
+ ast_frame_slinear_sum(read_frame, write_frame);
+
+ if (need_dup)
+ result = ast_frdup(read_frame);
+ else
+ result = read_frame;
+ } else {
+ if (need_dup) {
+ result = ast_frdup(read_frame);
+ result->next = ast_frdup(write_frame);
+ } else {
+ result = read_frame;
+ result->next = write_frame;
+ }
+ }
+
+ return result;
+}
More information about the svn-commits
mailing list