[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Sun Dec 4 17:37:48 CST 2011
branch "master" has been updated
via 7a865ca109654e97c88c15ed4e6bebf0f3b6a1b9 (commit)
from 1e99ad99de150e86169a828a6341717dad05e110 (commit)
Summary of changes:
src/BridgeImpl.cpp | 2 +-
src/CMakeLists.txt | 2 +
src/MediaMixer.cpp | 566 +++++++++++++++++++++++++++++++++++++++++++++
src/MediaMixer.h | 76 ++++++
src/MediaSplicer.cpp | 219 ++++++++++++++++--
src/MediaSplicer.h | 5 +-
src/SessionCollection.cpp | 20 ++-
src/SessionCollection.h | 3 +-
src/SessionOperations.cpp | 1 +
src/SessionWrapper.cpp | 3 +
src/SessionWrapper.h | 1 -
test/CMakeLists.txt | 1 +
test/UnitTests.cpp | 2 +-
13 files changed, 869 insertions(+), 32 deletions(-)
create mode 100755 src/MediaMixer.cpp
create mode 100644 src/MediaMixer.h
- Log -----------------------------------------------------------------
commit 7a865ca109654e97c88c15ed4e6bebf0f3b6a1b9
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Dec 4 18:52:08 2011 -0400
Add support for mixing streams from multiple sessions within a bridge. (issue ASTSCF-2)
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 0c94a05..515d710 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1152,7 +1152,7 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
const Logger& logger) :
mActivated(false),
mState(state),
- mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+ mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger, adapter)),
mName(name),
mObjAdapter(adapter),
mListeners(listenerMgr),
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4e20b5c..1b718a9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -30,6 +30,8 @@ astscf_component_add_files(bridgeservice ServiceUtil.h)
astscf_component_add_files(bridgeservice DebugUtil.h)
astscf_component_add_files(bridgeservice MediaSplicer.h)
astscf_component_add_files(bridgeservice MediaSplicer.cpp)
+astscf_component_add_files(bridgeservice MediaMixer.h)
+astscf_component_add_files(bridgeservice MediaMixer.cpp)
astscf_component_add_files(bridgeservice Tasks.h)
astscf_component_add_files(bridgeservice InternalExceptions.h)
astscf_component_add_files(bridgeservice BridgeServiceConfig.h)
diff --git a/src/MediaMixer.cpp b/src/MediaMixer.cpp
new file mode 100755
index 0000000..4237041
--- /dev/null
+++ b/src/MediaMixer.cpp
@@ -0,0 +1,566 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include "MediaMixer.h"
+
+#include <AsteriskSCF/Media/Formats/AudioFormats.h>
+
+#include <Ice/Ice.h>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Handle.h>
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Media::Formats::Audio::V1;
+using namespace std;
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+/**
+ * Interval (in milliseconds) that media is mixed in.
+ */
+#define MIXING_INTERVAL 20
+
+/**
+ * Implementation of a StreamSource which provides mixed audio.
+ */
+class MediaMixerSource : public StreamSource
+{
+public:
+ MediaMixerSource(const MediaMixerPtr& mixer) : mMixer(mixer) { }
+
+ void addSink(const StreamSinkPrx& sink, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ if (std::find(mSinks.begin(), mSinks.end(), sink) != mSinks.end())
+ {
+ return;
+ }
+
+ mSinks.push_back(sink);
+ }
+
+ void removeSink(const StreamSinkPrx& sink, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ mSinks.erase(std::remove(mSinks.begin(), mSinks.end(), sink), mSinks.end());
+
+ // If no more sinks exist then this should terminate
+ if (mSinks.empty())
+ {
+ mMixer->removeSource(this, mProxy);
+ }
+ }
+
+ StreamSinkSeq getSinks(const Ice::Current&)
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ return mSinks;
+ }
+
+ FormatSeq getFormats(const Ice::Current&)
+ {
+ // This is not protected by the lock because once created this will never be altered
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ // Same goes for the id
+ return mProxy->ice_getIdentity().name;
+ }
+
+ void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
+ {
+ throw MediaFormatSwitchException();
+ }
+
+ /**
+ * Internal function which sets our proxy.
+ */
+ void setProxy(const StreamSourcePrx& proxy)
+ {
+ mProxy = proxy;
+ }
+
+ /**
+ * Internal function which gets our proxy.
+ */
+ StreamSourcePrx getProxy()
+ {
+ return mProxy;
+ }
+
+private:
+ /**
+ * Lock which protects this source.
+ */
+ boost::shared_mutex mLock;
+
+ /**
+ * Media mixer we are associated with.
+ */
+ MediaMixerPtr mMixer;
+
+ /**
+ * Proxy to ourselves.
+ */
+ StreamSourcePrx mProxy;
+
+ /**
+ * Sinks that we are writing media to.
+ */
+ StreamSinkSeq mSinks;
+
+ /**
+ * Formats supported by this source.
+ */
+ FormatSeq mFormats;
+};
+
+/**
+ * Implementation of a StreamSink which provides audio to be mixed into the bridge.
+ */
+class MediaMixerSink : public StreamSink
+{
+public:
+ MediaMixerSink(const MediaMixerPtr& mixer, const MediaMixerSourcePtr& source) :
+ mMixer(mixer), mLocalSource(source) { }
+
+ void write(const FrameSeq& frames, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ mFrames.insert(mFrames.begin(), frames.begin(), frames.end());
+ }
+
+ void setSource(const StreamSourcePrx& source, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ if (source == 0)
+ {
+ mMixer->removeSink(this, mProxy);
+ }
+ else
+ {
+ mSource = source;
+ }
+ }
+
+ StreamSourcePrx getSource(const Ice::Current&)
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ return mSource;
+ }
+
+ FormatSeq getFormats(const Ice::Current&)
+ {
+ return mFormats;
+ }
+
+ std::string getId(const Ice::Current&)
+ {
+ return mProxy->ice_getIdentity().name;
+ }
+
+ /**
+ * Internal function which returns the latest frame to use in the mixer.
+ */
+ const FramePtr& getFrame()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ if (!mFrames.empty())
+ {
+ mProvidedFrame = mFrames.back();
+ mFrames.pop_back();
+ }
+
+ return mProvidedFrame;
+ }
+
+ /**
+ * Internal function which removes the latest frame, presumably having already been mixed.
+ */
+ const FramePtr popFrame()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ FramePtr frame = mProvidedFrame;
+ mProvidedFrame = 0;
+
+ return frame;
+ }
+
+ /**
+ * Internal function which sets our proxy.
+ */
+ void setProxy(const StreamSinkPrx& proxy)
+ {
+ mProxy = proxy;
+ }
+
+ /**
+ * Internal function which returns our proxy.
+ */
+ StreamSinkPrx getProxy()
+ {
+ return mProxy;
+ }
+
+ /**
+ * Internal function which returns the local source.
+ */
+ MediaMixerSourcePtr getLocalSource()
+ {
+ return mLocalSource;
+ }
+
+private:
+ /**
+ * Lock which protects this sink.
+ */
+ boost::shared_mutex mLock;
+
+ /**
+ * Media mixer we are associated with.
+ */
+ MediaMixerPtr mMixer;
+
+ /**
+ * Pointer to our source of media, this exists within this class because the two
+ * are inherently associated.
+ */
+ MediaMixerSourcePtr mLocalSource;
+
+ /**
+ * Proxy to ourselves.
+ */
+ StreamSinkPrx mProxy;
+
+ /**
+ * Sequence of received frames pending mixing.
+ */
+ FrameSeq mFrames;
+
+ /**
+ * Source of media for this sink.
+ */
+ StreamSourcePrx mSource;
+
+ /**
+ * Formats supported by this sink.
+ */
+ FormatSeq mFormats;
+
+ /**
+ * Frame we provided when getFrame was called.
+ */
+ FramePtr mProvidedFrame;
+};
+
+/**
+ * Simple callback object used when writing to sinks.
+ */
+class WriteCallback : public IceUtil::Shared
+{
+public:
+ void writtenCB()
+ {
+ }
+
+ void failureCB(const Ice::Exception&)
+ {
+ }
+};
+
+/**
+ * Smart pointer for the above WriteCallback class.
+ */
+typedef IceUtil::Handle<WriteCallback> WriteCallbackPtr;
+
+class MediaMixerI
+{
+public:
+ MediaMixerI(const Ice::ObjectAdapterPtr& adapter, const FormatPtr& format) : mAdapter(adapter), mFormat(format),
+ mTimer(new IceUtil::Timer()),
+ mCallback(new WriteCallback()) { }
+
+ /**
+ * Lock which protects the mixer.
+ */
+ boost::shared_mutex mLock;
+
+ /**
+ * Object adapter that sinks/sources are on.
+ */
+ Ice::ObjectAdapterPtr mAdapter;
+
+ /**
+ * Format for produced frames.
+ */
+ FormatPtr mFormat;
+
+ /**
+ * Vector of sinks for media.
+ */
+ std::vector<MediaMixerSinkPtr> mSinks;
+
+ /**
+ * Vector of sources for media.
+ */
+ std::vector<MediaMixerSourcePtr> mSources;
+
+ /**
+ * Timer thread used to mix the media.
+ */
+ IceUtil::TimerPtr mTimer;
+
+ /**
+ * Callback object for when writing to sinks.
+ */
+ WriteCallbackPtr mCallback;
+};
+
+/**
+ * Transform function which performs a signed linear saturated add.
+ */
+static short saturatedSignedLinearAdd(short input, short value)
+{
+ int res = input + value;
+
+ if (res > 32767)
+ {
+ return 32767;
+ }
+ else if (res < -32767)
+ {
+ return -32767;
+ }
+ else
+ {
+ return static_cast<short>(res);
+ }
+}
+
+/**
+ * Transform function which performs a signed linear saturated subtract.
+ */
+static short saturatedSignedLinearSubtract(short input, short value)
+{
+ int res = input - value;
+
+ if (res > 32767)
+ {
+ return 32767;
+ }
+ else if (res < -32767)
+ {
+ return -32767;
+ }
+ else
+ {
+ return static_cast<short>(res);
+ }
+}
+
+MediaMixer::MediaMixer(const Ice::ObjectAdapterPtr& adapter, const FormatPtr& format) : mImpl(new MediaMixerI(adapter, format))
+{
+ mImpl->mTimer->schedule(this, IceUtil::Time::milliSeconds(MIXING_INTERVAL));
+}
+
+void MediaMixer::createMixing(const Ice::Identity& sinkId, AsteriskSCF::Media::V1::StreamSinkPrx& mixerSink,
+ const Ice::Identity& sourceId, AsteriskSCF::Media::V1::StreamSourcePrx& mixerSource)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ // The source is where mixed audio will originate from, within this implementation it essentially acts
+ // as a store of sinks we need to write to for this source
+ Ice::Identity id = sourceId;
+ id.name += ".mixer";
+ MediaMixerSourcePtr source;
+ if (mImpl->mAdapter->find(id))
+ {
+ mixerSource = StreamSourcePrx::uncheckedCast(mImpl->mAdapter->createProxy(id));
+ }
+ else
+ {
+ source = new MediaMixerSource(this);
+ mImpl->mSources.push_back(source);
+ mixerSource = StreamSourcePrx::uncheckedCast(mImpl->mAdapter->add(source, id));
+ source->setProxy(mixerSource);
+ }
+
+ // The sink is where the source will send audio to to be mixed into the bridge, the reason the local source
+ // is associated is because we need to know what media to remove from the mixed frame when writing out
+ id = sinkId;
+ id.name += ".mixer";
+ if (mImpl->mAdapter->find(id))
+ {
+ mixerSink = StreamSinkPrx::uncheckedCast(mImpl->mAdapter->createProxy(id));
+ }
+ else
+ {
+ MediaMixerSinkPtr sink = new MediaMixerSink(this, source);
+ mImpl->mSinks.push_back(sink);
+ mixerSink = StreamSinkPrx::uncheckedCast(mImpl->mAdapter->add(sink, id));
+ sink->setProxy(mixerSink);
+ }
+}
+
+void MediaMixer::runTimerTask()
+{
+ IceUtil::Time start = IceUtil::Time::now();
+
+ std::vector<MediaMixerSinkPtr> sinks;
+
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ sinks = mImpl->mSinks;
+ }
+
+ FramePtr frame = new Frame();
+ frame->mediaFormat = mImpl->mFormat;
+
+ ShortSeqPayloadPtr mixedPayload = new ShortSeqPayload();
+ frame->payload = mixedPayload;
+
+ for (std::vector<MediaMixerSinkPtr>::const_iterator sink = sinks.begin();
+ sink != sinks.end();
+ ++sink)
+ {
+ FramePtr sinkFrame = (*sink)->getFrame();
+
+ // If no frame exists for us to mix in skip doing so
+ if (!sinkFrame)
+ {
+ continue;
+ }
+
+ ShortSeqPayloadPtr payload = ShortSeqPayloadPtr::dynamicCast(sinkFrame->payload);
+
+ // We only currently support short sequence payloads
+ if (!payload)
+ {
+ continue;
+ }
+
+ // If no audio exists within the mixedPayload yet just copy this frame in, otherwise mix it in
+ if (!mixedPayload->payload.size())
+ {
+ mixedPayload->payload.resize(payload->payload.size());
+ std::copy(payload->payload.begin(), payload->payload.end(), mixedPayload->payload.begin());
+ }
+ else
+ {
+ std::transform(mixedPayload->payload.begin(), mixedPayload->payload.end(),
+ payload->payload.begin(), mixedPayload->payload.begin(),
+ saturatedSignedLinearAdd);
+ }
+ }
+
+ for (std::vector<MediaMixerSinkPtr>::const_iterator sink = sinks.begin();
+ sink != sinks.end();
+ ++sink)
+ {
+ FramePtr sinkFrame = (*sink)->popFrame();
+ FramePtr theirFrame(frame);
+
+ if (sinkFrame)
+ {
+ theirFrame = FramePtr::dynamicCast(frame->ice_clone());
+
+ ShortSeqPayloadPtr sinkPayload = ShortSeqPayloadPtr::dynamicCast(sinkFrame->payload);
+
+ // Remove the participants own media so they do not hear themselves
+ ShortSeqPayloadPtr payload = new ShortSeqPayload();
+ payload->payload.resize(mixedPayload->payload.size());
+ std::transform(mixedPayload->payload.begin(), mixedPayload->payload.end(),
+ sinkPayload->payload.begin(), payload->payload.begin(),
+ saturatedSignedLinearSubtract);
+ theirFrame->payload = payload;
+ }
+
+ // Send this frame to all the sinks we should
+ StreamSinkSeq sourceSinks = (*sink)->getLocalSource()->getSinks(Ice::Current());
+
+ FrameSeq frames;
+ frames.push_back(theirFrame);
+
+ for (StreamSinkSeq::const_iterator sourceSink = sourceSinks.begin();
+ sourceSink != sourceSinks.end();
+ ++sourceSink)
+ {
+ (*sourceSink)->begin_write(frames, newCallback_StreamSink_write(mImpl->mCallback,
+ &WriteCallback::writtenCB,
+ &WriteCallback::failureCB));
+ }
+ }
+
+ IceUtil::Time elapsed = IceUtil::Time::now() - start;
+ mImpl->mTimer->schedule(this, IceUtil::Time::milliSeconds(MIXING_INTERVAL - elapsed.toMilliSeconds()));
+}
+
+void MediaMixer::removeSource(const MediaMixerSourcePtr& source, const StreamSourcePrx& proxy)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSources.erase(std::remove(mImpl->mSources.begin(), mImpl->mSources.end(), source), mImpl->mSources.end());
+ mImpl->mAdapter->remove(proxy->ice_getIdentity());
+}
+
+void MediaMixer::removeSink(const MediaMixerSinkPtr& sink, const StreamSinkPrx& proxy)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSinks.erase(std::remove(mImpl->mSinks.begin(), mImpl->mSinks.end(), sink), mImpl->mSinks.end());
+ mImpl->mAdapter->remove(proxy->ice_getIdentity());
+}
+
+void MediaMixer::stop()
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mTimer->destroy();
+
+ for (std::vector<MediaMixerSinkPtr>::const_iterator sink = mImpl->mSinks.begin();
+ sink != mImpl->mSinks.end();
+ ++sink)
+ {
+ mImpl->mAdapter->remove((*sink)->getProxy()->ice_getIdentity());
+ }
+
+ mImpl->mSinks.clear();
+
+ for (std::vector<MediaMixerSourcePtr>::const_iterator source = mImpl->mSources.begin();
+ source != mImpl->mSources.end();
+ ++source)
+ {
+ mImpl->mAdapter->remove((*source)->getProxy()->ice_getIdentity());
+ }
+
+ mImpl->mSources.clear();
+}
+
+}
+}
diff --git a/src/MediaMixer.h b/src/MediaMixer.h
new file mode 100644
index 0000000..d167e25
--- /dev/null
+++ b/src/MediaMixer.h
@@ -0,0 +1,76 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <IceUtil/Shared.h>
+#include <IceUtil/Timer.h>
+#include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/logger.h>
+#include <boost/shared_ptr.hpp>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+/**
+ * Forward declarations.
+ */
+class MediaMixerSource;
+class MediaMixerSink;
+
+/**
+ * Smart pointer type for MediaMixerSource class.
+ */
+typedef IceUtil::Handle<MediaMixerSource> MediaMixerSourcePtr;
+
+/**
+ * Smart pointer type for MediaMixerSink class.
+ */
+typedef IceUtil::Handle<MediaMixerSink> MediaMixerSinkPtr;
+
+/**
+ * Implementation details for MediaMixer class.
+ */
+class MediaMixerI;
+class MediaMixer : public IceUtil::TimerTask
+{
+public:
+ MediaMixer(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::V1::FormatPtr&);
+
+ void createMixing(const Ice::Identity&, AsteriskSCF::Media::V1::StreamSinkPrx&,
+ const Ice::Identity&, AsteriskSCF::Media::V1::StreamSourcePrx&);
+
+ void removeSource(const MediaMixerSourcePtr&, const AsteriskSCF::Media::V1::StreamSourcePrx&);
+ void removeSink(const MediaMixerSinkPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&);
+
+ void runTimerTask();
+ void stop();
+
+private:
+ /**
+ * Private implementation details for MediaMixer.
+ */
+ boost::shared_ptr<MediaMixerI> mImpl;
+};
+
+/**
+ * Smart pointer for above MediaMixer class.
+ */
+typedef IceUtil::Handle<MediaMixer> MediaMixerPtr;
+
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 8022a66..27411cc 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -14,6 +14,8 @@
* at the top of the source tree.
*/
#include "MediaSplicer.h"
+#include "MediaMixer.h"
+#include <AsteriskSCF/Media/Formats/AudioFormats.h>
#include <IceUtil/Shared.h>
#include <IceUtil/Handle.h>
#include "BridgeServiceConfig.h"
@@ -37,6 +39,7 @@
//
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Media::Formats::Audio::V1;
using namespace AsteriskSCF::Replication::BridgeService::V1;
using namespace std;
@@ -385,19 +388,20 @@ private:
};
QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
- const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
-//
-// TODO: This needs to register the streams with an active threaded mixing element.
-//
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
+ const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer, bool mixer);
+
class MediaSplicerI : public boost::enable_shared_from_this<MediaSplicerI>
{
public:
- MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
+ MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger,
+ const Ice::ObjectAdapterPtr& adapter) :
mCommunicator(communicator),
mBridgeId(bridgeId),
mReplicator(replicator),
- mLogger(logger)
+ mLogger(logger),
+ mUsingMixer(false),
+ mAdapter(adapter)
{
}
@@ -460,7 +464,7 @@ public:
// An alternative is to pass back the queued tasks to the caller and let them start and stop the process.
//
ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, sessionPrx, existing,
- shared_from_this()), mLogger));
+ shared_from_this(), mUsingMixer), mLogger));
taskExecutor->start();
}
@@ -493,18 +497,141 @@ public:
return result;
}
+ void enableMixing()
+ {
+ mUsingMixer = true;
+ }
+
+ void disableMixing()
+ {
+ if (mUsingMixer == false)
+ {
+ return;
+ }
+
+ for (MediaMixers::const_iterator mixer = mMediaMixers.begin();
+ mixer != mMediaMixers.end();
+ ++mixer)
+ {
+ mixer->second->stop();
+ }
+
+ mMediaMixers.clear();
+
+ mUsingMixer = false;
+ }
+
+ void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairing)
+ {
+ if (pairing->incomingMediaPairings.empty() || pairing->outgoingMediaPairings.empty())
+ {
+ return;
+ }
+
+ // If this is connected to a mixer we need to set things up
+ if (pairing->incomingMediaPairings.front()->sink->ice_getIdentity().name.rfind(".mixer") == std::string::npos)
+ {
+ return;
+ }
+
+ mUsingMixer = true;
+
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
+
+ findCompatiblePairings(pairing->streams, outgoing, incoming);
+ }
+
+ void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+ vector<IncomingPairing>& incoming)
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ // We create a copy of the vector so we do not end up connecting multiple streams to the same mixer
+ MediaMixers mixers = mMediaMixers;
+
+ for (StreamInformationDict::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
+ {
+ if (stream->second->state == Removed)
+ {
+ continue;
+ }
+
+ MediaMixerPtr mixer;
+ bool allowed = true;
+
+ // Only allow signed linear formats to create or find a mixer
+ for (FormatSeq::const_iterator format = stream->second->formats.begin();
+ format != stream->second->formats.end();
+ ++format)
+ {
+ SignedLinearPtr slin = SignedLinearPtr::dynamicCast((*format));
+
+ if (!slin)
+ {
+ allowed = false;
+ break;
+ }
+ }
+
+ // If the stream contains an unsupported format do not go further, do not pass go
+ if (allowed == false)
+ {
+ continue;
+ }
+
+ // Look for a mixer based on the format on this stream
+ std::string format = stream->second->formats.front()->name;
+ MediaMixers::const_iterator existingMixer = mixers.end();
+
+ if (existingMixer == mixers.end())
+ {
+ mLogger(Debug) << FUNLOG << ": creating media mixer for format " << format;
+ mixer = new MediaMixer(mAdapter, stream->second->formats.front());
+ mMediaMixers.insert(make_pair(format, mixer));
+ }
+ else
+ {
+ mLogger(Debug) << FUNLOG << ": using found media mixer for format " << format;
+ mixer = existingMixer->second;
+ mixers.erase(format);
+ }
+
+ StreamSinkSeq sinks = stream->second->sinks;
+ StreamSourceSeq sources = stream->second->sources;
+
+ // Iterate through sources and sinks to connect them to the mixer
+ // Note that we must have a matching number of sinks and sources, this is so we can cancel out
+ // their media from the mixing
+ while (!sinks.empty() && !sources.empty())
+ {
+ // These proxies will contain the details for the mixer
+ StreamSourcePrx mixerSource;
+ StreamSinkPrx mixerSink;
+
+ // Ask the mixer to create a source and sink for our source and sink
+ mixer->createMixing(sinks.back()->ice_getIdentity(), mixerSink,
+ sources.back()->ice_getIdentity(), mixerSource);
+
+ // Now that we have done so we can create pairings
+ outgoing.push_back(OutgoingPairing(sinks.back(), mixerSource));
+ incoming.push_back(IncomingPairing(sources.back(), mixerSink));
+
+ // These sink and source are completely done, don't touch them again
+ sinks.pop_back();
+ sources.pop_back();
+ }
+ }
+ }
+
void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
vector<IncomingPairing>& incoming,
MediaConnectorIPtr connector,
std::map<std::string, std::string>& connections,
DirectMediaConnectionDict& directConnections)
{
- // If no streams are present we can not establish any pairings
- if (streams.empty())
- {
- return;
- }
-
boost::shared_lock<boost::shared_mutex> lock(mLock);
for (MediaConnectors::const_iterator i = mConnectors.begin(); i != mConnectors.end(); ++i)
@@ -586,11 +713,17 @@ private:
string mBridgeId;
ReplicatorSmartPrx mReplicator;
Logger mLogger;
+ bool mUsingMixer;
+ Ice::ObjectAdapterPtr mAdapter;
typedef vector<MediaConnectorIPtr> MediaConnectors;
MediaConnectors mConnectors;
+ typedef std::map<std::string, MediaMixerPtr> MediaMixers;
+
+ MediaMixers mMediaMixers;
+
bool hasPreferredCodecOverride()
{
//
@@ -676,24 +809,40 @@ private:
class GetCompatiblePairings : public QueuedTask
{
public:
- GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+ GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials, bool mixer) :
QueuedTask("GetCompatiblePairings"),
mSplicer(splicer),
- mMaterials(materials)
+ mMaterials(materials),
+ mUsingMixer(mixer)
{
}
protected:
bool executeImpl()
{
- mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
- mMaterials->connector, mMaterials->connections, mMaterials->directConnections);
+
+ if (mMaterials->streams.empty())
+ {
+ return true;
+ }
+
+ if (mUsingMixer == true)
+ {
+ mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+ }
+ else
+ {
+ mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
+ mMaterials->connector, mMaterials->connections, mMaterials->directConnections);
+ }
+
return true;
}
private:
MediaSplicerIPtr mSplicer;
MediaConnectorBuilderPtr mMaterials;
+ bool mUsingMixer;
};
//
@@ -986,8 +1135,8 @@ private:
};
QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
- const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer)
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
+ const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer, bool mixer)
{
QueuedTasks tasks;
MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
@@ -995,9 +1144,15 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
materials->sessionPrx = sessionPrx;
tasks.push_back(new GetStreams(session, materials));
tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
- tasks.push_back(new GetCompatiblePairings(splicer, materials));
+ tasks.push_back(new GetCompatiblePairings(splicer, materials, mixer));
tasks.push_back(new MakeConnections(materials));
- tasks.push_back(new MakeDirectConnections(materials));
+
+ // We can not establish direct connections when doing mixing
+ if (mixer == false)
+ {
+ tasks.push_back(new MakeDirectConnections(materials));
+ }
+
return tasks;
}
@@ -1007,8 +1162,9 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
using namespace AsteriskSCF::BridgeService;
MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
- const Logger& logger) :
- mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger))
+ const Logger& logger,
+ const Ice::ObjectAdapterPtr& adapter) :
+ mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger, adapter))
{
}
@@ -1021,3 +1177,18 @@ MediaConnectorPtr MediaSplicer::createReplica(const SessionPairingPtr& pairings)
{
return mImpl->createReplica(pairings);
}
+
+void MediaSplicer::enableMixing()
+{
+ mImpl->enableMixing();
+}
+
+void MediaSplicer::disableMixing()
+{
+ mImpl->disableMixing();
+}
+
+void MediaSplicer::update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairing)
+{
+ mImpl->update(pairing);
+}
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 31b3f19..18aa36f 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -85,10 +85,13 @@ class MediaSplicer : public IceUtil::Shared
{
public:
MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
- const AsteriskSCF::System::Logging::Logger& logger);
+ const AsteriskSCF::System::Logging::Logger& logger, const Ice::ObjectAdapterPtr&);
void connect(const SessionWrapperPtr& session, const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx);
MediaConnectorPtr createReplica(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairings);
+ void enableMixing();
+ void disableMixing();
+ void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr&);
private:
boost::shared_ptr<MediaSplicerI> mImpl;
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 7d06ef6..bd3f1ff 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -26,14 +26,15 @@ using namespace AsteriskSCF;
using namespace std;
SessionCollection::SessionCollection(const Ice::CommunicatorPtr& comm, const string& bridgeId,
- const ReplicatorSmartPrx& replicator,
- const Logger& logger) :
+ const ReplicatorSmartPrx& replicator,
+ const Logger& logger,
+ const Ice::ObjectAdapterPtr& adapter) :
mCommunicator(comm),
mReplicator(replicator),
mLogger(logger),
mBridgeId(bridgeId),
mSessionCounter(0),
- mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger))
+ mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger, adapter))
{
}
@@ -86,6 +87,13 @@ SessionWrapperPtr SessionCollection::addSession(const SessionPrx& session)
SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mSplicer, mReplicator, mLogger));
mMap[key] = newSession;
++mSessionCounter;
+
+ // If this collection now requires mixing turn it on
+ if (mMap.size() > 2)
+ {
+ mSplicer->enableMixing();
+ }
+
return newSession;
}
@@ -200,4 +208,10 @@ void SessionCollection::removeSession(const BridgedSessionPtr& session)
{
mMap.erase(i);
}
+
+ // If this collection does not require mixing make sure it is disabled
+ if (mMap.size() < 3)
+ {
+ mSplicer->disableMixing();
+ }
}
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index d306da9..d758e85 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -47,7 +47,8 @@ class SessionCollection : public IceUtil::Shared
public:
SessionCollection(const Ice::CommunicatorPtr& communicator, const std::string& bridgeId,
- const ReplicatorSmartPrx& replicator, const AsteriskSCF::System::Logging::Logger& logger);
+ const ReplicatorSmartPrx& replicator, const AsteriskSCF::System::Logging::Logger& logger,
+ const Ice::ObjectAdapterPtr& adapter);
/**
* Obtains the SessionWrapper instance for the specified proxy.
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 38c0184..df4b8ae 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -336,3 +336,4 @@ void DisconnectTelephonyOperation::disconnectSinks
(*i)->removeSinks(sinksToRemove);
}
}
+
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 4adbd59..11b59b8 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -508,6 +508,9 @@ void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
{
mLogger(Trace) << FUNLOG << " for " << mId;
boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ mSplicer->update(pairings);
+
if (mConnector)
{
mConnector->update(pairings);
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 6fc3f43..a03d78b 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -172,7 +172,6 @@ private:
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mSessionController;
AsteriskSCF::SessionCommunications::PartyIdentification::V1::ConnectedLinePtr mConnectedLine;
bool mConnectedLineSet;
-
/**
* Sends changes to the replication service. This should never occur
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9773c64..979d462 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -30,6 +30,7 @@ astscf_component_add_files(bridge_unit_tests ../src/SessionCollection.cpp)
astscf_component_add_files(bridge_unit_tests ../src/SessionOperations.cpp)
astscf_component_add_files(bridge_unit_tests ../src/SessionWrapper.cpp)
astscf_component_add_files(bridge_unit_tests ../src/MediaSplicer.cpp)
+astscf_component_add_files(bridge_unit_tests ../src/MediaMixer.cpp)
astscf_component_add_slices(bridge_unit_tests PROJECT AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice)
astscf_component_add_files(bridge_unit_tests UnitTests.cpp)
astscf_component_add_ice_libraries(bridge_unit_tests Ice)
diff --git a/test/UnitTests.cpp b/test/UnitTests.cpp
index 8a8ebb7..7af1dd8 100644
--- a/test/UnitTests.cpp
+++ b/test/UnitTests.cpp
@@ -351,7 +351,7 @@ BOOST_FIXTURE_TEST_CASE(testGetSeq, Fixture)
{
IceEnvironment iceEnv;
AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
- SessionCollectionPtr collection(new SessionCollection(iceEnv.communicator(), "test", rep, getLogger()));
+ SessionCollectionPtr collection(new SessionCollection(iceEnv.communicator(), "test", rep, getLogger(), 0));
SessionPrx dummy = SessionPrx::uncheckedCast(iceEnv.communicator()->stringToProxy("foo0:default"));
collection->replicaUpdate(createBridgedSession(Added, dummy));
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list