[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Sun Dec 4 17:37:48 CST 2011


branch "master" has been updated
       via  7a865ca109654e97c88c15ed4e6bebf0f3b6a1b9 (commit)
      from  1e99ad99de150e86169a828a6341717dad05e110 (commit)

Summary of changes:
 src/BridgeImpl.cpp        |    2 +-
 src/CMakeLists.txt        |    2 +
 src/MediaMixer.cpp        |  566 +++++++++++++++++++++++++++++++++++++++++++++
 src/MediaMixer.h          |   76 ++++++
 src/MediaSplicer.cpp      |  219 ++++++++++++++++--
 src/MediaSplicer.h        |    5 +-
 src/SessionCollection.cpp |   20 ++-
 src/SessionCollection.h   |    3 +-
 src/SessionOperations.cpp |    1 +
 src/SessionWrapper.cpp    |    3 +
 src/SessionWrapper.h      |    1 -
 test/CMakeLists.txt       |    1 +
 test/UnitTests.cpp        |    2 +-
 13 files changed, 869 insertions(+), 32 deletions(-)
 create mode 100755 src/MediaMixer.cpp
 create mode 100644 src/MediaMixer.h


- Log -----------------------------------------------------------------
commit 7a865ca109654e97c88c15ed4e6bebf0f3b6a1b9
Author: Joshua Colp <jcolp at digium.com>
Date:   Sun Dec 4 18:52:08 2011 -0400

    Add support for mixing streams from multiple sessions within a bridge. (issue ASTSCF-2)

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 0c94a05..515d710 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1152,7 +1152,7 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
         const Logger& logger) :
     mActivated(false),
     mState(state),
-    mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+    mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger, adapter)),
     mName(name),
     mObjAdapter(adapter),
     mListeners(listenerMgr),
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4e20b5c..1b718a9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -30,6 +30,8 @@ astscf_component_add_files(bridgeservice ServiceUtil.h)
 astscf_component_add_files(bridgeservice DebugUtil.h)
 astscf_component_add_files(bridgeservice MediaSplicer.h)
 astscf_component_add_files(bridgeservice MediaSplicer.cpp)
+astscf_component_add_files(bridgeservice MediaMixer.h)
+astscf_component_add_files(bridgeservice MediaMixer.cpp)
 astscf_component_add_files(bridgeservice Tasks.h)
 astscf_component_add_files(bridgeservice InternalExceptions.h)
 astscf_component_add_files(bridgeservice BridgeServiceConfig.h)
diff --git a/src/MediaMixer.cpp b/src/MediaMixer.cpp
new file mode 100755
index 0000000..4237041
--- /dev/null
+++ b/src/MediaMixer.cpp
@@ -0,0 +1,566 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include "MediaMixer.h"
+
+#include <AsteriskSCF/Media/Formats/AudioFormats.h>
+
+#include <Ice/Ice.h>
+#include <IceUtil/Shared.h>
+#include <IceUtil/Handle.h>
+
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Media::Formats::Audio::V1;
+using namespace std;
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+/**
+ * Interval (in milliseconds) that media is mixed in.
+ */
+#define MIXING_INTERVAL 20
+
+/**
+ * Implementation of a StreamSource which provides mixed audio.
+ */
+class MediaMixerSource : public StreamSource
+{
+public:
+    MediaMixerSource(const MediaMixerPtr& mixer) : mMixer(mixer) { }
+
+    void addSink(const StreamSinkPrx& sink, const Ice::Current&)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        if (std::find(mSinks.begin(), mSinks.end(), sink) != mSinks.end())
+        {
+            return;
+        }
+
+        mSinks.push_back(sink);
+    }
+
+    void removeSink(const StreamSinkPrx& sink, const Ice::Current&)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        mSinks.erase(std::remove(mSinks.begin(), mSinks.end(), sink), mSinks.end());
+
+        // If no more sinks exist then this should terminate
+        if (mSinks.empty())
+        {
+            mMixer->removeSource(this, mProxy);
+        }
+    }
+    
+    StreamSinkSeq getSinks(const Ice::Current&)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+        return mSinks;
+    }
+    
+    FormatSeq getFormats(const Ice::Current&)
+    {
+        // This is not protected by the lock because once created this will never be altered
+        return mFormats;
+    }
+    
+    std::string getId(const Ice::Current&)
+    {
+        // Same goes for the id
+        return mProxy->ice_getIdentity().name;
+    }
+    
+    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
+    {
+        throw MediaFormatSwitchException();
+    }
+
+    /**
+     * Internal function which sets our proxy.
+     */
+    void setProxy(const StreamSourcePrx& proxy)
+    {
+        mProxy = proxy;
+    }
+
+    /**
+     * Internal function which gets our proxy.
+     */
+    StreamSourcePrx getProxy()
+    {
+        return mProxy;
+    }
+
+private:
+    /**
+     * Lock which protects this source.
+     */
+    boost::shared_mutex mLock;
+
+    /**
+     * Media mixer we are associated with.
+     */
+    MediaMixerPtr mMixer;
+
+    /**
+     * Proxy to ourselves.
+     */
+    StreamSourcePrx mProxy;
+
+    /**
+     * Sinks that we are writing media to.
+     */
+    StreamSinkSeq mSinks;
+
+    /**
+     * Formats supported by this source.
+     */
+    FormatSeq mFormats;
+};
+
+/**
+ * Implementation of a StreamSink which provides audio to be mixed into the bridge.
+ */
+class MediaMixerSink : public StreamSink
+{
+public:
+    MediaMixerSink(const MediaMixerPtr& mixer, const MediaMixerSourcePtr& source) :
+        mMixer(mixer), mLocalSource(source) { }
+
+    void write(const FrameSeq& frames, const Ice::Current&)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        mFrames.insert(mFrames.begin(), frames.begin(), frames.end());
+    }
+    
+    void setSource(const StreamSourcePrx& source, const Ice::Current&)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        if (source == 0)
+        {
+            mMixer->removeSink(this, mProxy);
+        }
+        else
+        {
+            mSource = source;
+        }
+    }
+    
+    StreamSourcePrx getSource(const Ice::Current&)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+        return mSource;
+    }
+    
+    FormatSeq getFormats(const Ice::Current&)
+    {
+        return mFormats;
+    }
+    
+    std::string getId(const Ice::Current&)
+    {
+        return mProxy->ice_getIdentity().name;
+    }
+
+    /**
+     * Internal function which returns the latest frame to use in the mixer.
+     */
+    const FramePtr& getFrame()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        if (!mFrames.empty())
+        {
+            mProvidedFrame = mFrames.back();
+            mFrames.pop_back();
+        }
+
+        return mProvidedFrame;
+    }
+
+    /**
+     * Internal function which removes the latest frame, presumably having already been mixed.
+     */
+    const FramePtr popFrame()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        FramePtr frame = mProvidedFrame;
+        mProvidedFrame = 0;
+
+        return frame;
+    }
+
+    /**
+     * Internal function which sets our proxy.
+     */
+    void setProxy(const StreamSinkPrx& proxy)
+    {
+        mProxy = proxy;
+    }
+
+    /**
+     * Internal function which returns our proxy.
+     */
+    StreamSinkPrx getProxy()
+    {
+        return mProxy;
+    }
+
+    /**
+     * Internal function which returns the local source.
+     */
+    MediaMixerSourcePtr getLocalSource()
+    {
+        return mLocalSource;
+    }
+
+private:
+    /**
+     * Lock which protects this sink.
+     */
+    boost::shared_mutex mLock;
+
+    /**
+     * Media mixer we are associated with.
+     */
+    MediaMixerPtr mMixer;
+
+    /**
+     * Pointer to our source of media, this exists within this class because the two
+     * are inherently associated.
+     */
+    MediaMixerSourcePtr mLocalSource;
+
+    /**
+     * Proxy to ourselves.
+     */
+    StreamSinkPrx mProxy;
+
+    /**
+     * Sequence of received frames pending mixing.
+     */
+    FrameSeq mFrames;
+
+    /**
+     * Source of media for this sink.
+     */
+    StreamSourcePrx mSource;
+
+    /**
+     * Formats supported by this sink.
+     */
+    FormatSeq mFormats;
+
+    /**
+     * Frame we provided when getFrame was called.
+     */
+    FramePtr mProvidedFrame;
+};
+
+/**
+ * Simple callback object used when writing to sinks.
+ */
+class WriteCallback : public IceUtil::Shared
+{
+public:
+    void writtenCB()
+    {
+    }
+
+    void failureCB(const Ice::Exception&)
+    {
+    }
+};
+
+/**
+ * Smart pointer for the above WriteCallback class.
+ */
+typedef IceUtil::Handle<WriteCallback> WriteCallbackPtr;
+
+class MediaMixerI
+{
+public:
+    MediaMixerI(const Ice::ObjectAdapterPtr& adapter, const FormatPtr& format) : mAdapter(adapter), mFormat(format),
+										 mTimer(new IceUtil::Timer()),
+										 mCallback(new WriteCallback()) { }
+
+    /**
+     * Lock which protects the mixer.
+     */
+    boost::shared_mutex mLock;
+
+    /**
+     * Object adapter that sinks/sources are on.
+     */
+    Ice::ObjectAdapterPtr mAdapter;
+
+    /**
+     * Format for produced frames.
+     */
+    FormatPtr mFormat;
+
+    /**
+     * Vector of sinks for media.
+     */
+    std::vector<MediaMixerSinkPtr> mSinks;
+
+    /**
+     * Vector of sources for media.
+     */
+    std::vector<MediaMixerSourcePtr> mSources;
+
+    /**
+     * Timer thread used to mix the media.
+     */
+    IceUtil::TimerPtr mTimer;
+
+    /**
+     * Callback object for when writing to sinks.
+     */
+    WriteCallbackPtr mCallback;
+};
+
+/**
+ * Transform function which performs a signed linear saturated add.
+ */
+static short saturatedSignedLinearAdd(short input, short value)
+{
+    int res = input + value;
+
+    if (res > 32767)
+    {
+        return 32767;
+    }
+    else if (res < -32767)
+    {
+        return -32767;
+    }
+    else
+    {
+        return static_cast<short>(res);
+    }
+}
+
+/**
+ * Transform function which performs a signed linear saturated subtract.
+ */
+static short saturatedSignedLinearSubtract(short input, short value)
+{
+    int res = input - value;
+
+    if (res > 32767)
+    {
+        return 32767;
+    }
+    else if (res < -32767)
+    {
+        return -32767;
+    }
+    else
+    {
+        return static_cast<short>(res);
+    }
+}
+
+MediaMixer::MediaMixer(const Ice::ObjectAdapterPtr& adapter, const FormatPtr& format) : mImpl(new MediaMixerI(adapter, format))
+{
+    mImpl->mTimer->schedule(this, IceUtil::Time::milliSeconds(MIXING_INTERVAL));
+}
+
+void MediaMixer::createMixing(const Ice::Identity& sinkId, AsteriskSCF::Media::V1::StreamSinkPrx& mixerSink,
+                              const Ice::Identity& sourceId, AsteriskSCF::Media::V1::StreamSourcePrx& mixerSource)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    // The source is where mixed audio will originate from, within this implementation it essentially acts
+    // as a store of sinks we need to write to for this source
+    Ice::Identity id = sourceId;
+    id.name += ".mixer";
+    MediaMixerSourcePtr source;
+    if (mImpl->mAdapter->find(id))
+    {
+        mixerSource = StreamSourcePrx::uncheckedCast(mImpl->mAdapter->createProxy(id));
+    }
+    else
+    {
+        source = new MediaMixerSource(this);
+        mImpl->mSources.push_back(source);
+        mixerSource = StreamSourcePrx::uncheckedCast(mImpl->mAdapter->add(source, id));
+        source->setProxy(mixerSource);
+    }
+
+    // The sink is where the source will send audio to to be mixed into the bridge, the reason the local source
+    // is associated is because we need to know what media to remove from the mixed frame when writing out
+    id = sinkId;
+    id.name += ".mixer";
+    if (mImpl->mAdapter->find(id))
+    {
+        mixerSink = StreamSinkPrx::uncheckedCast(mImpl->mAdapter->createProxy(id));
+    }
+    else
+    {
+        MediaMixerSinkPtr sink = new MediaMixerSink(this, source);
+        mImpl->mSinks.push_back(sink);
+        mixerSink = StreamSinkPrx::uncheckedCast(mImpl->mAdapter->add(sink, id));
+        sink->setProxy(mixerSink);
+    }
+}
+
+void MediaMixer::runTimerTask()
+{
+    IceUtil::Time start = IceUtil::Time::now();
+
+    std::vector<MediaMixerSinkPtr> sinks;
+
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+        sinks = mImpl->mSinks;
+    }
+
+    FramePtr frame = new Frame();
+    frame->mediaFormat = mImpl->mFormat;
+
+    ShortSeqPayloadPtr mixedPayload = new ShortSeqPayload();
+    frame->payload = mixedPayload;
+
+    for (std::vector<MediaMixerSinkPtr>::const_iterator sink = sinks.begin();
+         sink != sinks.end();
+         ++sink)
+    {
+        FramePtr sinkFrame = (*sink)->getFrame();
+
+        // If no frame exists for us to mix in skip doing so
+        if (!sinkFrame)
+        {
+            continue;
+        }
+
+        ShortSeqPayloadPtr payload = ShortSeqPayloadPtr::dynamicCast(sinkFrame->payload);
+
+        // We only currently support short sequence payloads
+        if (!payload)
+        {
+            continue; 
+       }
+
+        // If no audio exists within the mixedPayload yet just copy this frame in, otherwise mix it in
+        if (!mixedPayload->payload.size())
+        {
+            mixedPayload->payload.resize(payload->payload.size());
+            std::copy(payload->payload.begin(), payload->payload.end(), mixedPayload->payload.begin());
+        }
+        else
+        {
+            std::transform(mixedPayload->payload.begin(), mixedPayload->payload.end(),
+                           payload->payload.begin(), mixedPayload->payload.begin(),
+                           saturatedSignedLinearAdd);
+        }
+    }
+
+    for (std::vector<MediaMixerSinkPtr>::const_iterator sink = sinks.begin();
+         sink != sinks.end();
+         ++sink)
+    {
+        FramePtr sinkFrame = (*sink)->popFrame();
+	FramePtr theirFrame(frame);
+
+        if (sinkFrame)
+        {
+	    theirFrame = FramePtr::dynamicCast(frame->ice_clone());
+
+            ShortSeqPayloadPtr sinkPayload = ShortSeqPayloadPtr::dynamicCast(sinkFrame->payload);
+
+            // Remove the participants own media so they do not hear themselves
+            ShortSeqPayloadPtr payload = new ShortSeqPayload();
+            payload->payload.resize(mixedPayload->payload.size());
+	    std::transform(mixedPayload->payload.begin(), mixedPayload->payload.end(),
+                           sinkPayload->payload.begin(), payload->payload.begin(),
+                           saturatedSignedLinearSubtract);
+            theirFrame->payload = payload;
+        }
+
+        // Send this frame to all the sinks we should
+        StreamSinkSeq sourceSinks = (*sink)->getLocalSource()->getSinks(Ice::Current());
+
+	FrameSeq frames;
+	frames.push_back(theirFrame);
+
+        for (StreamSinkSeq::const_iterator sourceSink = sourceSinks.begin();
+             sourceSink != sourceSinks.end();
+             ++sourceSink)
+        {
+            (*sourceSink)->begin_write(frames, newCallback_StreamSink_write(mImpl->mCallback,
+                                                                            &WriteCallback::writtenCB,
+                                                                            &WriteCallback::failureCB));
+        }
+    }
+
+    IceUtil::Time elapsed = IceUtil::Time::now() - start;
+    mImpl->mTimer->schedule(this, IceUtil::Time::milliSeconds(MIXING_INTERVAL - elapsed.toMilliSeconds()));
+}
+
+void MediaMixer::removeSource(const MediaMixerSourcePtr& source, const StreamSourcePrx& proxy)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSources.erase(std::remove(mImpl->mSources.begin(), mImpl->mSources.end(), source), mImpl->mSources.end());
+    mImpl->mAdapter->remove(proxy->ice_getIdentity());
+}
+
+void MediaMixer::removeSink(const MediaMixerSinkPtr& sink, const StreamSinkPrx& proxy)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSinks.erase(std::remove(mImpl->mSinks.begin(), mImpl->mSinks.end(), sink), mImpl->mSinks.end());
+    mImpl->mAdapter->remove(proxy->ice_getIdentity());
+}
+
+void MediaMixer::stop()
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mTimer->destroy();
+
+    for (std::vector<MediaMixerSinkPtr>::const_iterator sink = mImpl->mSinks.begin();
+         sink != mImpl->mSinks.end();
+         ++sink)
+    {
+        mImpl->mAdapter->remove((*sink)->getProxy()->ice_getIdentity());
+    }
+
+    mImpl->mSinks.clear();
+
+    for (std::vector<MediaMixerSourcePtr>::const_iterator source = mImpl->mSources.begin();
+         source != mImpl->mSources.end();
+         ++source)
+    {
+        mImpl->mAdapter->remove((*source)->getProxy()->ice_getIdentity());
+    }
+
+    mImpl->mSources.clear();
+}
+
+}
+}
diff --git a/src/MediaMixer.h b/src/MediaMixer.h
new file mode 100644
index 0000000..d167e25
--- /dev/null
+++ b/src/MediaMixer.h
@@ -0,0 +1,76 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <IceUtil/Shared.h>
+#include <IceUtil/Timer.h>
+#include <AsteriskSCF/Media/MediaIf.h>
+#include <AsteriskSCF/logger.h>
+#include <boost/shared_ptr.hpp>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+
+/**
+ * Forward declarations.
+ */
+class MediaMixerSource;
+class MediaMixerSink;
+
+/**
+ * Smart pointer type for MediaMixerSource class.
+ */
+typedef IceUtil::Handle<MediaMixerSource> MediaMixerSourcePtr;
+
+/**
+ * Smart pointer type for MediaMixerSink class.
+ */
+typedef IceUtil::Handle<MediaMixerSink> MediaMixerSinkPtr;
+
+/**
+ * Implementation details for MediaMixer class.
+ */ 
+class MediaMixerI;
+class MediaMixer : public IceUtil::TimerTask
+{
+public:
+    MediaMixer(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::V1::FormatPtr&);
+
+    void createMixing(const Ice::Identity&, AsteriskSCF::Media::V1::StreamSinkPrx&,
+                      const Ice::Identity&, AsteriskSCF::Media::V1::StreamSourcePrx&);
+
+    void removeSource(const MediaMixerSourcePtr&, const AsteriskSCF::Media::V1::StreamSourcePrx&);
+    void removeSink(const MediaMixerSinkPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&);
+
+    void runTimerTask();
+    void stop();
+
+private:
+    /**
+     * Private implementation details for MediaMixer.
+     */
+    boost::shared_ptr<MediaMixerI> mImpl;
+};
+
+/**
+ * Smart pointer for above MediaMixer class.
+ */
+typedef IceUtil::Handle<MediaMixer> MediaMixerPtr;
+
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 8022a66..27411cc 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -14,6 +14,8 @@
  * at the top of the source tree.
  */
 #include "MediaSplicer.h"
+#include "MediaMixer.h"
+#include <AsteriskSCF/Media/Formats/AudioFormats.h>
 #include <IceUtil/Shared.h>
 #include <IceUtil/Handle.h>
 #include "BridgeServiceConfig.h"
@@ -37,6 +39,7 @@
 //
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Media::Formats::Audio::V1;
 using namespace AsteriskSCF::Replication::BridgeService::V1;
 using namespace std;
 
@@ -385,19 +388,20 @@ private:
 };
 
 QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, 
-    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
-    const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
-//
-// TODO: This needs to register the streams with an active threaded mixing element.
-//
+                                    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
+                                    const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer, bool mixer);
+
 class MediaSplicerI : public boost::enable_shared_from_this<MediaSplicerI>
 {
 public:
-    MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
+    MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger,
+                  const Ice::ObjectAdapterPtr& adapter) :
         mCommunicator(communicator),
         mBridgeId(bridgeId),
         mReplicator(replicator),
-        mLogger(logger)
+        mLogger(logger),
+        mUsingMixer(false),
+	mAdapter(adapter)
     {
     }
 
@@ -460,7 +464,7 @@ public:
         // An alternative is to pass back the queued tasks to the caller and let them start and stop the process.
         //
         ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, sessionPrx, existing, 
-            shared_from_this()), mLogger));
+                                                                      shared_from_this(), mUsingMixer), mLogger));
         taskExecutor->start();
     }
 
@@ -493,18 +497,141 @@ public:
         return result;
     }
 
+    void enableMixing()
+    {
+        mUsingMixer = true;
+    }
+
+    void disableMixing()
+    {
+        if (mUsingMixer == false)
+        {
+            return;
+        }
+
+        for (MediaMixers::const_iterator mixer = mMediaMixers.begin();
+             mixer != mMediaMixers.end();
+             ++mixer)
+        {
+            mixer->second->stop();
+        }
+
+        mMediaMixers.clear();
+
+        mUsingMixer = false;
+    }
+
+    void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairing)
+    {
+        if (pairing->incomingMediaPairings.empty() || pairing->outgoingMediaPairings.empty())
+        {
+            return;
+        }
+
+        // If this is connected to a mixer we need to set things up
+        if (pairing->incomingMediaPairings.front()->sink->ice_getIdentity().name.rfind(".mixer") == std::string::npos)
+        {
+            return;
+        }
+
+	mUsingMixer = true;
+
+        vector<OutgoingPairing> outgoing;
+        vector<IncomingPairing> incoming;
+
+        findCompatiblePairings(pairing->streams, outgoing, incoming);
+    }
+
+    void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+                                vector<IncomingPairing>& incoming)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+	// We create a copy of the vector so we do not end up connecting multiple streams to the same mixer
+	MediaMixers mixers = mMediaMixers;
+
+        for (StreamInformationDict::const_iterator stream = streams.begin();
+             stream != streams.end();
+             ++stream)
+        {
+            if (stream->second->state == Removed)
+            {
+                continue;
+            }
+
+	    MediaMixerPtr mixer;
+            bool allowed = true;
+
+            // Only allow signed linear formats to create or find a mixer
+            for (FormatSeq::const_iterator format = stream->second->formats.begin();
+                 format != stream->second->formats.end();
+                 ++format)
+            {
+                SignedLinearPtr slin = SignedLinearPtr::dynamicCast((*format));
+
+                if (!slin)
+                {
+                    allowed = false;
+                    break;
+                }
+            }
+
+            // If the stream contains an unsupported format do not go further, do not pass go
+            if (allowed == false)
+            {
+                continue;
+            }
+
+            // Look for a mixer based on the format on this stream
+            std::string format = stream->second->formats.front()->name;
+            MediaMixers::const_iterator existingMixer = mixers.end();
+
+            if (existingMixer == mixers.end())
+            {
+                mLogger(Debug) << FUNLOG << ": creating media mixer for format " << format;
+                mixer = new MediaMixer(mAdapter, stream->second->formats.front());
+                mMediaMixers.insert(make_pair(format, mixer));
+	    }
+	    else
+	    {
+		mLogger(Debug) << FUNLOG << ": using found media mixer for format " << format;
+		mixer = existingMixer->second;
+                mixers.erase(format);
+	    }
+
+            StreamSinkSeq sinks = stream->second->sinks;
+            StreamSourceSeq sources = stream->second->sources;
+
+            // Iterate through sources and sinks to connect them to the mixer
+            // Note that we must have a matching number of sinks and sources, this is so we can cancel out
+            // their media from the mixing
+            while (!sinks.empty() && !sources.empty())
+            {
+                // These proxies will contain the details for the mixer
+                StreamSourcePrx mixerSource;
+                StreamSinkPrx mixerSink;
+
+                // Ask the mixer to create a source and sink for our source and sink
+                mixer->createMixing(sinks.back()->ice_getIdentity(), mixerSink,
+                                    sources.back()->ice_getIdentity(), mixerSource);
+
+                // Now that we have done so we can create pairings
+                outgoing.push_back(OutgoingPairing(sinks.back(), mixerSource));
+                incoming.push_back(IncomingPairing(sources.back(), mixerSink));
+
+                // These sink and source are completely done, don't touch them again
+                sinks.pop_back();
+                sources.pop_back();
+            }
+        }
+    }
+
     void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
                                 vector<IncomingPairing>& incoming,
                                 MediaConnectorIPtr connector,
                                 std::map<std::string, std::string>& connections,
                                 DirectMediaConnectionDict& directConnections)
     {
-        // If no streams are present we can not establish any pairings
-        if (streams.empty())
-        {
-            return;
-        }
-
         boost::shared_lock<boost::shared_mutex> lock(mLock);
 
         for (MediaConnectors::const_iterator i = mConnectors.begin(); i != mConnectors.end(); ++i)
@@ -586,11 +713,17 @@ private:
     string mBridgeId;
     ReplicatorSmartPrx mReplicator;
     Logger mLogger;
+    bool mUsingMixer;
+    Ice::ObjectAdapterPtr mAdapter;
 
     typedef vector<MediaConnectorIPtr> MediaConnectors;
 
     MediaConnectors mConnectors;
 
+    typedef std::map<std::string, MediaMixerPtr> MediaMixers;
+
+    MediaMixers mMediaMixers;
+
     bool hasPreferredCodecOverride()
     {
         //
@@ -676,24 +809,40 @@ private:
 class GetCompatiblePairings : public QueuedTask
 {
 public:
-    GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+    GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials, bool mixer) :
         QueuedTask("GetCompatiblePairings"),
         mSplicer(splicer),
-        mMaterials(materials)
+        mMaterials(materials),
+        mUsingMixer(mixer)
     {
     }
 
 protected:
     bool executeImpl()
     {
-	mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
-                                         mMaterials->connector, mMaterials->connections, mMaterials->directConnections);
+
+        if (mMaterials->streams.empty())
+        {
+            return true;
+        }
+
+        if (mUsingMixer == true)
+        {
+            mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+        }
+        else
+        {
+            mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
+                                             mMaterials->connector, mMaterials->connections, mMaterials->directConnections);
+        }
+
         return true;
     }
 
 private:
     MediaSplicerIPtr mSplicer;
     MediaConnectorBuilderPtr mMaterials;
+    bool mUsingMixer;
 };
 
 //
@@ -986,8 +1135,8 @@ private:
 };
 
 QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, 
-    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx, 
-    const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer)
+                                    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx, 
+                                    const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer, bool mixer)
 {
     QueuedTasks tasks;
     MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
@@ -995,9 +1144,15 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
     materials->sessionPrx = sessionPrx;
     tasks.push_back(new GetStreams(session, materials));
     tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
-    tasks.push_back(new GetCompatiblePairings(splicer, materials));
+    tasks.push_back(new GetCompatiblePairings(splicer, materials, mixer));
     tasks.push_back(new MakeConnections(materials));
-    tasks.push_back(new MakeDirectConnections(materials));
+
+    // We can not establish direct connections when doing mixing
+    if (mixer == false)
+    {
+        tasks.push_back(new MakeDirectConnections(materials));
+    }
+
     return tasks;
 }
 
@@ -1007,8 +1162,9 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
 using namespace AsteriskSCF::BridgeService;
 
 MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
-        const Logger& logger) :
-    mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger))
+                           const Logger& logger,
+                           const Ice::ObjectAdapterPtr& adapter) :
+    mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger, adapter))
 {
 }
 
@@ -1021,3 +1177,18 @@ MediaConnectorPtr MediaSplicer::createReplica(const SessionPairingPtr& pairings)
 {
     return mImpl->createReplica(pairings);
 }
+
+void MediaSplicer::enableMixing()
+{
+    mImpl->enableMixing();
+}
+
+void MediaSplicer::disableMixing()
+{
+    mImpl->disableMixing();
+}
+
+void MediaSplicer::update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairing)
+{
+    mImpl->update(pairing);
+}
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 31b3f19..18aa36f 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -85,10 +85,13 @@ class MediaSplicer : public IceUtil::Shared
 {
 public:
     MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
-            const AsteriskSCF::System::Logging::Logger& logger);
+                 const AsteriskSCF::System::Logging::Logger& logger, const Ice::ObjectAdapterPtr&);
     
     void connect(const SessionWrapperPtr& session, const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx);
     MediaConnectorPtr createReplica(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& pairings);
+    void enableMixing();
+    void disableMixing();
+    void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr&);
 
 private:
     boost::shared_ptr<MediaSplicerI> mImpl;
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 7d06ef6..bd3f1ff 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -26,14 +26,15 @@ using namespace AsteriskSCF;
 using namespace std;
 
 SessionCollection::SessionCollection(const Ice::CommunicatorPtr& comm, const string& bridgeId,
-        const ReplicatorSmartPrx& replicator,
-        const Logger& logger) :
+                                     const ReplicatorSmartPrx& replicator,
+                                     const Logger& logger,
+                                     const Ice::ObjectAdapterPtr& adapter) :
     mCommunicator(comm),
     mReplicator(replicator),
     mLogger(logger),
     mBridgeId(bridgeId),
     mSessionCounter(0),
-    mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger))
+    mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger, adapter))
 {
 }
  
@@ -86,6 +87,13 @@ SessionWrapperPtr SessionCollection::addSession(const SessionPrx& session)
     SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mSplicer,  mReplicator, mLogger));
     mMap[key] = newSession;
     ++mSessionCounter;
+
+    // If this collection now requires mixing turn it on
+    if (mMap.size() > 2)
+    {
+        mSplicer->enableMixing();
+    }
+
     return newSession;
 }
 
@@ -200,4 +208,10 @@ void SessionCollection::removeSession(const BridgedSessionPtr& session)
     {
         mMap.erase(i);
     }
+
+    // If this collection does not require mixing make sure it is disabled
+    if (mMap.size() < 3)
+    {
+	mSplicer->disableMixing();
+    }
 }
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index d306da9..d758e85 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -47,7 +47,8 @@ class SessionCollection : public IceUtil::Shared
 public:
     
     SessionCollection(const Ice::CommunicatorPtr& communicator, const std::string& bridgeId,
-            const ReplicatorSmartPrx& replicator, const AsteriskSCF::System::Logging::Logger& logger);
+                      const ReplicatorSmartPrx& replicator, const AsteriskSCF::System::Logging::Logger& logger,
+                      const Ice::ObjectAdapterPtr& adapter);
 
     /**
      * Obtains the SessionWrapper instance for the specified proxy.
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 38c0184..df4b8ae 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -336,3 +336,4 @@ void DisconnectTelephonyOperation::disconnectSinks
             (*i)->removeSinks(sinksToRemove);
     }
 }
+
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 4adbd59..11b59b8 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -508,6 +508,9 @@ void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
 {
     mLogger(Trace) << FUNLOG << " for " << mId;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+    mSplicer->update(pairings);
+
     if (mConnector)
     {
         mConnector->update(pairings);
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 6fc3f43..a03d78b 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -172,7 +172,6 @@ private:
     AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mSessionController;
     AsteriskSCF::SessionCommunications::PartyIdentification::V1::ConnectedLinePtr mConnectedLine;
     bool mConnectedLineSet;
-   
 
     /**
      * Sends changes to the replication service. This should never occur
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9773c64..979d462 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -30,6 +30,7 @@ astscf_component_add_files(bridge_unit_tests ../src/SessionCollection.cpp)
 astscf_component_add_files(bridge_unit_tests ../src/SessionOperations.cpp)
 astscf_component_add_files(bridge_unit_tests ../src/SessionWrapper.cpp)
 astscf_component_add_files(bridge_unit_tests ../src/MediaSplicer.cpp)
+astscf_component_add_files(bridge_unit_tests ../src/MediaMixer.cpp)
 astscf_component_add_slices(bridge_unit_tests PROJECT AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice)
 astscf_component_add_files(bridge_unit_tests UnitTests.cpp)
 astscf_component_add_ice_libraries(bridge_unit_tests Ice)
diff --git a/test/UnitTests.cpp b/test/UnitTests.cpp
index 8a8ebb7..7af1dd8 100644
--- a/test/UnitTests.cpp
+++ b/test/UnitTests.cpp
@@ -351,7 +351,7 @@ BOOST_FIXTURE_TEST_CASE(testGetSeq, Fixture)
 {
     IceEnvironment iceEnv;
     AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
-    SessionCollectionPtr collection(new SessionCollection(iceEnv.communicator(), "test", rep, getLogger()));
+    SessionCollectionPtr collection(new SessionCollection(iceEnv.communicator(), "test", rep, getLogger(), 0));
 
     SessionPrx dummy = SessionPrx::uncheckedCast(iceEnv.communicator()->stringToProxy("foo0:default"));
     collection->replicaUpdate(createBridgedSession(Added, dummy));

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list