[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Sun Sep 4 11:04:02 CDT 2011


branch "master" has been updated
       via  7ae703dcaf863f700b577ec6796e3ef81bbb0ebf (commit)
      from  8b9064121d04e22ab3277b8549f0ce8c6e2f69a2 (commit)

Summary of changes:
 .../BridgeService/BridgeReplicatorIf.ice           |    2 +-
 src/BridgeImpl.cpp                                 |  138 ++++++-
 src/MediaSplicer.cpp                               |  438 ++++++++------------
 src/MediaSplicer.h                                 |    5 +
 src/SessionOperations.cpp                          |  123 ++++++
 src/SessionOperations.h                            |   41 ++
 src/SessionWrapper.cpp                             |   23 +
 src/SessionWrapper.h                               |   13 +
 8 files changed, 527 insertions(+), 256 deletions(-)


- Log -----------------------------------------------------------------
commit 7ae703dcaf863f700b577ec6796e3ef81bbb0ebf
Author: Joshua Colp <jcolp at digium.com>
Date:   Sun Sep 4 13:08:41 2011 -0300

    Add support for multiple streams using the SessionController interface.

diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 5a7892c..fb0b7ec 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -97,7 +97,7 @@ class SessionPairing extends ReplicatedStateItem
 {
     string bridgeKey;
     string sessionKey;
-    AsteriskSCF::Media::V1::Session* mediaSession;
+    AsteriskSCF::Media::V1::StreamInformationDict streams;
     MediaPairingSeq incomingMediaPairings;
     MediaPairingSeq outgoingMediaPairings;
 };
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index a9cd9e1..a7b2feb 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -78,7 +78,6 @@ public:
     // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
     //
     void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
-    void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
     void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
             const Ice::Current&);
 
@@ -381,6 +380,140 @@ private:
     SessionsTrackerPtr mTracker;
 };
 
+class BridgeSessionController : public SessionController
+{
+public:
+    BridgeSessionController(const BridgeImplPtr& bridge,
+                            const SessionWrapperPtr& self,
+                            const Logger& logger) : mBridge(bridge), mSelf(self), mLogger(logger) { }
+
+    void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+                                  const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
+    {
+        // We do not care about stream state changes at this point in time
+        cb->ice_response();
+    }
+
+    void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+                          const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+    {
+        AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
+        mBridge->getSessions()->visitSessions(*op);
+    }
+
+    void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
+                             const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+    {
+        RemoveStreamsOperation op(mSelf, streams);
+        mBridge->getSessions()->visitSessions(op);
+        cb->ice_response();
+    }
+
+private:
+    BridgeImplPtr mBridge;
+    SessionWrapperPtr mSelf;
+    Logger mLogger;
+};
+
+class SetAndGetSessionControllerTask : public QueuedTask
+{
+public:
+    SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+                                   const SessionCollectionPtr& sessionCollection,
+                                   const SessionSeq& sessions,
+                                   const BridgeImplPtr& bridge,
+                                   const Logger& logger):
+        QueuedTask("SetAndGetSessionControllerTask"),
+        mAdapter(adapter),
+        mSessionManager(sessionCollection),
+        mSessions(sessions),
+        mTracker(new SessionsTracker()),
+        mBridge(bridge),
+        mLogger(logger)
+        {
+        }
+
+protected:
+    bool executeImpl()
+    {
+        bool tasksDispatched = false;
+        for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        {
+            SessionWrapperPtr session = mSessionManager->getSession(*i);
+            if (session == 0)
+            {
+                //
+                // This shouldn't happen!
+                //
+                mTracker->addExceptionMessage(*i, "session not found");
+                continue;
+            }
+            tasksDispatched = true;
+            SessionControllerPtr controller = new BridgeSessionController(mBridge, session, mLogger);
+	    std::string identity = (*i)->ice_getIdentity().name;
+	    identity += ".bridgecontroller";
+	    SessionControllerPrx controllerPrx =
+		SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+	    (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
+                                                   &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+        }
+        return !tasksDispatched;
+    }
+
+    void get(const SessionControllerPrx& controller, const SessionWrapperPtr& session)
+    {
+        session->setSessionController(controller);
+        mTracker->add(session->getSession());
+        if (mTracker->responseCount() == mSessions.size())
+        {
+            mListener->succeeded();
+        }
+    }
+
+    void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+    {
+        //
+        // TODO:
+        // * Log exception.
+        // * Rollback
+        // * Interpret exception and decide whether or not to fail the entire operations.
+        // Currently the semantics allow some operations to fail.
+        //
+        mTracker->addException(session->getSession(), ex);
+        try
+        {
+            //
+            // We want to make sure that session is laying around. The session collection will
+            // take care of cleaning it up as long as it is marked as destroyed.
+            //
+            session->destroy();
+        }
+        catch(...)
+        {
+        }
+        if (mTracker->responseCount() == mSessions.size())
+        {
+            SessionErrorSeq exceptions = mTracker->getExceptions();
+            if (exceptions.size() == mSessions.size())
+            {
+                mListener->failed();
+            }
+            else
+            {
+                mListener->succeeded();
+            }
+        }
+    }
+    
+private:
+    Ice::ObjectAdapterPtr mAdapter;
+    SessionCollectionPtr mSessionManager;
+    SessionSeq mSessions;
+    SessionsTrackerPtr mTracker;
+    BridgeImplPtr mBridge;
+    Logger mLogger;
+};
+
 class AddToListeners : public QueuedTask
 {
 public:
@@ -563,6 +696,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
         QueuedTasks tasks;
         tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
         tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+	tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
         tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
         tasks.push_back(new UpdateTask(this));
         ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -827,6 +961,7 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
         tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
         tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
         tasks.push_back(new AddToListeners(mListeners, tracker, cookies));
+	tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
         tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
         tasks.push_back(new UpdateTask(this));
         ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -1037,6 +1172,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
     SessionsTrackerPtr tracker(new SessionsTracker);
     tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
     tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+    tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
     tasks.push_back(new UpdateTask(this));
 }
 
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index e4992fb..a6a0a1e 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -68,13 +68,12 @@ typedef vector<IncomingPairing> IncomingPairings;
 struct MediaConnectorBuilder : public IceUtil::Shared
 {
     AsteriskSCF::SessionCommunications::V1::SessionPrx sessionPrx;
-    AsteriskSCF::Media::V1::SessionPrx mediaSession;
-    AsteriskSCF::Media::V1::StreamSourceSeq sources;
-    AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+    AsteriskSCF::Media::V1::StreamInformationDict streams;
     MediaConnectorIPtr peer;
     MediaConnectorIPtr connector;
     OutgoingPairings outgoingPairings;
     IncomingPairings incomingPairings;
+    std::map<std::string, std::string> connections;
 };
 typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
 
@@ -91,9 +90,9 @@ public:
             const std::string& sessionId,
             const ReplicatorSmartPrx& replicator,
             const Logger& logger) :
-        mMedia(data->mediaSession),
         mOutgoing(data->outgoingPairings),
         mIncoming(data->incomingPairings),
+        mStreams(data->streams),
         mPeer(data->peer), 
         mConnected(true),
         mBridgeId(bridgeId),
@@ -111,7 +110,7 @@ public:
             const MediaConnectorPtr& peer,
             const ReplicatorSmartPrx& replicator,
             const Logger& logger) :
-        mMedia(pairing->mediaSession),
+        mStreams(pairing->streams),
         mPeer(peer),
         mConnected(true),
         mBridgeId(pairing->bridgeKey),
@@ -165,7 +164,7 @@ public:
     }
 
     void update(const MediaConnectorPtr& peer, const OutgoingPairings& outgoing,
-            const IncomingPairings& incoming)
+                const IncomingPairings& incoming, const std::map<std::string, std::string>& newConnections)
     {
         mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
         SessionPairingPtr currentState;
@@ -174,6 +173,7 @@ public:
             mPeer = peer;
             mOutgoing = outgoing;
             mIncoming = incoming;
+            mConnections = newConnections;
             currentState = createUpdate();
         }
         pushUpdate(currentState);
@@ -184,7 +184,7 @@ public:
         if (mReplicator)
         {
             SessionPairingPtr currentState(new SessionPairing);
-            currentState->mediaSession = mMedia;
+            currentState->streams = mStreams;
             currentState->bridgeKey = mBridgeId;
             currentState->sessionKey = mSessionId;
             currentState->key = mKey;
@@ -230,9 +230,13 @@ public:
     void destroy()
     {
         SessionPairingPtr newState;
+	vector<OutgoingPairing> outgoing;
+	vector<IncomingPairing> incoming;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
+	    outgoing = mOutgoing;
             mOutgoing.clear();
+	    incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
@@ -247,37 +251,32 @@ public:
         // it's media session constituents to stop communicating with anything
         // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        try
+	for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+             pairing != outgoing.end();
+             ++pairing)
         {
-            StreamSourceSeq sources = mMedia->getSources();
-            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+            try
             {
-                try
-                {
-//                    (*i)->setSink(StreamSinkPrx());
-// TODO: call removeSink
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                pairing->first->setSource(StreamSourcePrx());
             }
-            StreamSinkSeq sinks = mMedia->getSinks();
-            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+            catch (const Ice::Exception& ex)
             {
-                try
-                {
-                    (*i)->setSource(StreamSourcePrx());
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
             }
         }
-        catch (const Ice::Exception& ex)
+
+        for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+             pairing != incoming.end();
+             ++pairing)
         {
-            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            try
+            {
+                pairing->first->removeSink(pairing->second);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
         }
     }
 
@@ -301,9 +300,13 @@ public:
     {
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
         SessionPairingPtr newState;
+        vector<OutgoingPairing> outgoing;
+        vector<IncomingPairing> incoming;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
+            outgoing = mOutgoing;
             mOutgoing.clear();
+            incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
@@ -318,37 +321,32 @@ public:
         // it's media session constituents to stop communicating with anything
         // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        try
+        for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+             pairing != outgoing.end();
+             ++pairing)
         {
-            StreamSourceSeq sources = mMedia->getSources();
-            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+            try
             {
-                try
-                {
-//                    (*i)->setSink(StreamSinkPrx());
-// TODO: Call removeSink
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                pairing->first->setSource(StreamSourcePrx());
             }
-            StreamSinkSeq sinks = mMedia->getSinks();
-            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+            catch (const Ice::Exception& ex)
             {
-                try
-                {
-                    (*i)->setSource(StreamSourcePrx());
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
             }
         }
-        catch (const Ice::Exception& ex)
+
+        for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+             pairing != incoming.end();
+             ++pairing)
         {
-            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            try
+            {
+                pairing->first->removeSink(pairing->second);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
         }
 
         return true;
@@ -359,11 +357,21 @@ public:
         return mKey;
     }
 
+    StreamInformationDict streams()
+    {
+        return mStreams;
+    }
+
+    std::map<std::string, std::string> connections()
+    {
+        return mConnections;
+    }
+
 private:
     boost::shared_mutex mLock;
-    AsteriskSCF::Media::V1::SessionPrx mMedia;
     vector<OutgoingPairing> mOutgoing;
     vector<IncomingPairing> mIncoming;
+    StreamInformationDict mStreams;
     MediaConnectorPtr mPeer;
     bool mConnected;
     string mBridgeId;
@@ -372,6 +380,7 @@ private:
     Ice::Long mRepCounter;
     ReplicatorSmartPrx mReplicator;
     Logger mLogger;
+    std::map<std::string, std::string> mConnections;
 };
 
 QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, 
@@ -404,11 +413,11 @@ public:
         //
         if (data->peer)
         {
-            data->peer->update(connector, data->outgoingPairings, data->incomingPairings);
+            data->peer->update(connector, data->outgoingPairings, data->incomingPairings, data->connections);
         }
         connector->initialUpdate();
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+	mConnectors.push_back(connector);
         return connector;
     }
 
@@ -422,13 +431,13 @@ public:
         //
         // Reap.
         //
-        MediaSessions::iterator i = mSessions.begin();
-        while (i != mSessions.end())
+        MediaConnectors::iterator i = mConnectors.begin();
+        while (i != mConnectors.end())
         {
-            if (!i->connector->isConnected())
+            if (!(*i)->isConnected())
             {
                 mLogger(Debug) << FUNLOG << ": reaping a connector";
-                i = mSessions.erase(i);
+                i = mConnectors.erase(i);
                 continue;
             }
             ++i;
@@ -439,9 +448,9 @@ public:
         // one of them is going to have an empty connector.
         //
         MediaConnectorIPtr existing;
-        if (mSessions.size() == 1)
+        if (mConnectors.size() == 1)
         {
-            existing = mSessions.back().connector;
+            existing = mConnectors.back();
             existing->clearConnections();
         }
 
@@ -459,126 +468,112 @@ public:
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         MediaConnectorIPtr result;
 
-        MediaSessions::iterator i = mSessions.begin();
-        while (i != mSessions.end())
+        MediaConnectors::iterator i = mConnectors.begin();
+        while (i != mConnectors.end())
         {
-            if (!i->connector->isConnected())
+            if (!(*i)->isConnected())
             {
                 mLogger(Debug) << FUNLOG << ": reaping a connector";
-                MediaSessions::iterator t = i;
-                i = mSessions.erase(t);
+                MediaConnectors::iterator t = i;
+                i = mConnectors.erase(t);
                 continue;
             }
             ++i;
         }
 
         MediaConnectorPtr existing;
-        if (mSessions.size() == 1)
+        if (mConnectors.size() == 1)
         {
-            existing = mSessions.back().connector;
+            existing = mConnectors.back();
             existing->clearConnections();
         }
         result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
-        mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+        mConnectors.push_back(result);
         return result;
     }
 
-    vector<OutgoingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSinkSeq& sinks)
+    void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+                                vector<IncomingPairing>& incoming,
+                                MediaConnectorIPtr connector,
+                                std::map<std::string, std::string>& connections)
     {
-        vector<StreamSourceSeq> allSources;
-
+        // If no streams are present we can not establish any pairings
+        if (streams.empty())
         {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            //
-            // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
-            // will come into a mix and go out.
-            //
-            for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
-            {
-                if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
-                {
-                    continue;
-                }
-                allSources.push_back(i->mediaSession->getSources());
-            }
+            return;
         }
 
-        vector<OutgoingPairing> result;
-        for (StreamSinkSeq::const_iterator sink = sinks.begin(); sink != sinks.end(); ++sink)
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+        for (MediaConnectors::const_iterator i = mConnectors.begin(); i != mConnectors.end(); ++i)
         {
-            bool connected = false;
-            for (vector<StreamSourceSeq>::iterator sources = allSources.begin(); sources != allSources.end() && !connected; 
-                ++sources)
+            if ((*i)->id() == connector->id())
             {
-                for (StreamSourceSeq::iterator source = sources->begin(); source != sources->end(); ++source)
-                {
-                    if (canAdapt(*sink, *source))
-                    {
-                        result.push_back(OutgoingPairing(*sink, *source));
-                        sources->erase(source);
-                        connected = true;
-                        break;
-                    }
-                }
+                continue;
             }
-            if (!connected)
+
+            StreamInformationDict theirStreams = (*i)->streams();
+
+            // If no streams are present go ahead and skip it
+            if (theirStreams.empty())
             {
-                //
-                // TODO: We couldn't establish a proper connection for this sink. What happens
-                // here is probably a policy thing.
-                //
+                continue;
             }
-        }
-        return result;
-    }
 
-    vector<IncomingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSourceSeq& sources)
-    {
-        vector<StreamSinkSeq> allSinks;
-        {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            //
-            // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
-            // will come into a mix and go out.
-            //
-            for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+            // Iterate our own streams looking for any that can be connected to this session
+            for (StreamInformationDict::const_iterator stream = streams.begin();
+                 stream != streams.end();
+                 ++stream)
             {
-                if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
+                if (stream->second->state == Removed)
                 {
                     continue;
                 }
-                allSinks.push_back(i->mediaSession->getSinks());
-            }
-        }
 
-        vector<IncomingPairing> result;
-        for (StreamSourceSeq::const_iterator source = sources.begin(); source != sources.end(); ++source)
-        {
-            bool connected = false;
-            for (vector<StreamSinkSeq>::iterator sinks = allSinks.begin(); sinks != allSinks.end() && !connected; ++sinks)
-            {
-                for (StreamSinkSeq::iterator sink = sinks->begin(); sink != sinks->end(); ++sink)
+                for (StreamInformationDict::const_iterator theirStream = theirStreams.begin();
+                     theirStream != theirStreams.end();
+                     ++theirStream)
                 {
-                    if (canAdapt(*source, *sink))
+                    if (theirStream->second->state == Removed)
                     {
-                        result.push_back(IncomingPairing(*source, *sink));
-                        sink = sinks->erase(sink);
-                        connected = true;
-                        break;
+                        continue;
                     }
+
+                    // Is this stream compatible with the other stream?
+                    if (haveCommonFormats(stream->second, theirStream->second) == false)
+                    {
+                        continue;
+                    }
+
+		    // Connect all available sources and sinks together
+		    StreamSinkSeq sinks = stream->second->sinks;
+		    StreamSourceSeq sources = stream->second->sources;
+
+		    while (!sinks.empty() && !theirStream->second->sources.empty())
+		    {
+                        outgoing.push_back(OutgoingPairing(sinks.back(), theirStream->second->sources.back()));
+			sinks.pop_back();
+			theirStream->second->sources.pop_back();
+                    }
+
+		    while (!sources.empty() && !theirStream->second->sinks.empty())
+                    {
+                        incoming.push_back(IncomingPairing(sources.back(), theirStream->second->sinks.back()));
+			sources.pop_back();
+			theirStream->second->sinks.pop_back();
+                    }
+
+		    // Record this stream connection
+		    connections.insert(make_pair(theirStream->first, stream->first));
+
+                    // This stream has been connected so no longer consider it
+                    theirStreams.erase(theirStream->first);
+                    break;
                 }
             }
-            if (!connected)
-            {
-                //
-                // TODO: We couldn't establish a proper connection for this source! What happens now
-                // should be a matter of policy.
-                //
-            }
         }
-        return result;
     }
-    
+
 private:
 
     boost::shared_mutex mLock;
@@ -587,30 +582,9 @@ private:
     ReplicatorSmartPrx mReplicator;
     Logger mLogger;
 
-    struct MediaSessionStruct
-    {
-        SessionPrx mediaSession;
-        MediaConnectorIPtr connector;
-
-        MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
-            mediaSession(session),
-            connector(con)
-        {
-        }
-    };
-
-    typedef vector<MediaSessionStruct> MediaSessions;
+    typedef vector<MediaConnectorIPtr> MediaConnectors;
 
-    MediaSessions mSessions;
-
-    bool needMixing()
-    {
-        //
-        // TODO: We are only supporting single stream with two endpoints for the moment, so no mixing will
-        // be required.
-        //
-        return false;
-    }
+    MediaConnectors mConnectors;
 
     bool hasPreferredCodecOverride()
     {
@@ -622,93 +596,50 @@ private:
         return false;
     }
 
-    bool canAdapt(const Media::V1::StreamSourcePrx&, const Media::V1::StreamSinkPrx&)
-    {
-        //
-        // For the moment we are assuming two streams that can definitely talk to each other.
-        //
-        return true;
-    }
-
-    bool canAdapt(const Media::V1::StreamSinkPrx&, const Media::V1::StreamSourcePrx&)
+    bool canAdapt(const StreamInformationPtr&, const StreamInformationPtr&)
     {
-        //
-        // For the moment we are assuming two streams that can definitely talk to each other.
-        //
-        return true;
-    }
-};
-
-class GetMediaSession : public QueuedTask
-{
-public:
-    GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetMediaSession"),
-        mSession(session),
-        mMaterials(materials)
-    {
-    }
-
-protected:
-    bool executeImpl()
-    {
-        mMaterials->sessionPrx->begin_getMediaSession(
-            AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
-                        &GetMediaSession::done, &GetMediaSession::failed));
+        // For the moment we can not transcode
         return false;
     }
 
-    void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+    bool haveCommonFormats(const StreamInformationPtr& stream0, const StreamInformationPtr& stream1)
     {
-        mMaterials->mediaSession = mediaSession;
-        mListener->succeeded();
-    }
+	for (FormatSeq::const_iterator format0 = stream0->formats.begin();
+             format0 != stream0->formats.end();
+             ++format0)
+        {
+            for (FormatSeq::const_iterator format1 = stream1->formats.begin();
+                 format1 != stream1->formats.end();
+                 ++format1)
+            {
+                // Is the name alike?
+                if ((*format0)->name != (*format1)->name)
+                {
+                    continue;
+                }
 
-    void failed(const Ice::Exception&)
-    {
-        mListener->failed();
-    }
-private:
-    SessionWrapperPtr mSession;
-    MediaConnectorBuilderPtr mMaterials;
-};
+                // Is a format operations service available, and if so are they compatible?
+                if ((*format0)->operations &&
+                    (((*format0)->operations != (*format1)->operations ||
+                      ((*format0)->operations->checkCompatible((*format0), (*format1)) == false))))
+                {
+                    continue;
+                }
 
-class GetSinks : public QueuedTask
-{
-public:
-    GetSinks(const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetSinks"),
-        mMaterials(materials)
-    {
-    }
+                return true;
+            }
+        }
 
-protected:
-    bool executeImpl()
-    {
-        mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
-                        &GetSinks::done, &GetSinks::failed));
         return false;
     }
-
-    void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
-    {
-        mMaterials->sinks = sinks;
-        mListener->succeeded();
-    }
-
-    void failed(const Ice::Exception&)
-    {
-        mListener->failed();
-    }
-private:
-    MediaConnectorBuilderPtr mMaterials;
 };
 
-class GetSources : public QueuedTask
+class GetStreams : public QueuedTask
 {
 public:
-    GetSources(const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetSources"),
+    GetStreams(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetStreams"),
+        mSession(session),
         mMaterials(materials)
     {
     }
@@ -716,14 +647,15 @@ public:
 protected:
     bool executeImpl()
     {
-        mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
-                        &GetSources::done, &GetSources::failed));
+        mMaterials->sessionPrx->begin_getStreams(
+            AsteriskSCF::SessionCommunications::V1::newCallback_Session_getStreams(this,
+                        &GetStreams::done, &GetStreams::failed));
         return false;
     }
 
-    void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+    void done(const AsteriskSCF::Media::V1::StreamInformationDict& streams)
     {
-        mMaterials->sources = sources;
+        mMaterials->streams = streams;
         mListener->succeeded();
     }
 
@@ -731,8 +663,8 @@ protected:
     {
         mListener->failed();
     }
-    
 private:
+    SessionWrapperPtr mSession;
     MediaConnectorBuilderPtr mMaterials;
 };
 
@@ -749,8 +681,8 @@ public:
 protected:
     bool executeImpl()
     {
-        mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sources);
-        mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sinks);
+	mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
+                                         mMaterials->connector, mMaterials->connections);
         return true;
     }
 
@@ -897,7 +829,7 @@ protected:
         {
             if (mMaterials->connector)
             {
-                mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+                mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
             }
             mListener->succeeded();
         }
@@ -919,7 +851,7 @@ protected:
         {
             if (mMaterials->connector)
             {
-                mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+                mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
             }
             mListener->succeeded();
         }
@@ -995,9 +927,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
     MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
     materials->peer = peer;
     materials->sessionPrx = sessionPrx;
-    tasks.push_back(new GetMediaSession(session, materials));
-    tasks.push_back(new GetSinks(materials));
-    tasks.push_back(new GetSources(materials));
+    tasks.push_back(new GetStreams(session, materials));
     tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
     tasks.push_back(new GetCompatiblePairings(splicer, materials));
     tasks.push_back(new MakeConnections(materials));
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index e00b2a9..31b3f19 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -72,6 +72,11 @@ public:
      * Replication support function.
      **/
     virtual void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& update) = 0;
+
+    /**
+     * Return connection information.
+     */
+    virtual std::map<std::string, std::string> connections() = 0;
 };
 typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
  
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 58670a9..f7f8508 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -100,3 +100,126 @@ void RelayIndication::operator()(const SessionWrapperPtr& session)
         }
     }
 }
+
+AddStreamsOperation::AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+                                         const SessionWrapperPtr& self,
+                                         const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+                                         const Logger& logger) :
+    mCb(cb),
+    mSelf(self),
+    mStreams(streams),
+    mIgnore(false),
+    mLogger(logger)
+{
+}
+
+void AddStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+    if (mIgnore == true || mSelf == session)
+    {
+        return;
+    }
+
+    SessionControllerPrx controller = session->getSessionController();
+
+    if (!controller)
+    {
+        return;
+    }
+
+    // Go ahead and request that the streams be added
+    controller->begin_addStreams(mStreams, newCallback_SessionController_addStreams(this,
+                                                                                    &AddStreamsOperation::added,
+                                                                                    &AddStreamsOperation::failed),
+                                 session);
+
+    // We only allow requesting streams on a single session at this point
+    mIgnore = true;
+}
+
+void AddStreamsOperation::added(const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+                                const SessionWrapperPtr& session)
+{
+    // If any streams were added set up media so it flows
+    if (!streams.empty())
+    {
+        session->unplugMedia();
+        mSelf->unplugMedia();
+        session->setupMedia();
+        mSelf->setupMedia();
+    }
+
+    // Now that everything is ready go ahead and respond
+    mCb->ice_response(streams);
+}
+
+void AddStreamsOperation::failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+{
+    // Since this failed tell the session that initiated this operation that no streams were added
+    mCb->ice_response(AsteriskSCF::Media::V1::StreamInformationDict());
+
+    mLogger(Debug) << FUNLOG << ": (" << session->getSession() << ") " << ex.what();
+}
+
+RemoveStreamsOperation::RemoveStreamsOperation(const SessionWrapperPtr& self,
+                                               const AsteriskSCF::Media::V1::StreamInformationDict& streams) :
+    mSelf(self),
+    mStreams(streams),
+    mIgnore(false)
+{
+}
+
+void RemoveStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+    if (mIgnore == true || mSelf == session)
+    {
+        return;
+    }
+
+    AsteriskSCF::Media::V1::StreamInformationDict remove;
+    std::map<std::string, std::string> connections = session->connections();
+
+    // Determine the streams we actually need to remove
+    for (AsteriskSCF::Media::V1::StreamInformationDict::const_iterator stream = mStreams.begin();
+	 stream != mStreams.end();
+	 ++stream)
+    {
+	std::map<std::string, std::string>::const_iterator connection = connections.find(stream->first);
+
+	if (connection != connections.end())
+	{
+	    remove.insert(make_pair(connection->second, new AsteriskSCF::Media::V1::StreamInformation()));
+	}
+    }
+
+    if (remove.empty())
+    {
+        return;
+    }
+
+    SessionControllerPrx controller = session->getSessionController();
+
+    if (!controller)
+    {
+        return;
+    }
+
+    controller->begin_removeStreams(remove, newCallback_SessionController_removeStreams(this,
+                                                                                        &RemoveStreamsOperation::removed,
+                                                                                        &RemoveStreamsOperation::failed),
+                                    session);
+
+    mIgnore = true;
+}
+
+void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
+{
+    session->unplugMedia();
+    mSelf->unplugMedia();
+    session->setupMedia();
+    mSelf->setupMedia();
+}
+
+void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
+{
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index 97726fc..ce73388 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/logger.h>
 #include "BridgeReplicatorIf.h"
 #include "SessionWrapper.h"
@@ -77,6 +78,46 @@ private:
     bool mIncludeConnected;
 };
 
+class AddStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+    AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+                        const SessionWrapperPtr& self,
+                        const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+                        const AsteriskSCF::System::Logging::Logger& logger);
+
+    void operator()(const SessionWrapperPtr& session);
+
+    void added(const AsteriskSCF::Media::V1::StreamInformationDict& streams, const SessionWrapperPtr& session);
+    void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr mCb;
+    SessionWrapperPtr mSelf;
+    AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+    bool mIgnore;
+    AsteriskSCF::System::Logging::Logger mLogger;
+};
+
+typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
+
+class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+    RemoveStreamsOperation(const SessionWrapperPtr& self,
+                        const AsteriskSCF::Media::V1::StreamInformationDict& streams);
+    
+    void operator()(const SessionWrapperPtr& session);
+
+    void removed(const SessionWrapperPtr& session);
+    void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+    SessionWrapperPtr mSelf;
+    AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+    bool mIgnore;
+};
+
 class IfStateCriteria
 {
 public:
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index ea9f8fa..1178f5e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -467,6 +467,19 @@ SessionPrx SessionWrapper::getSession() const
     return mSession->session;
 }
 
+SessionControllerPrx SessionWrapper::getSessionController() const
+{
+    boost::shared_lock<boost::shared_mutex> lock(mLock);
+    return mSessionController;
+}
+
+void SessionWrapper::setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    mSessionController = controller;
+}
+
+
 void SessionWrapper::setupMedia()
 {
     mLogger(Debug) << FUNLOG << " for " << mId;
@@ -566,6 +579,16 @@ string SessionWrapper::id()
     return mId;
 }
 
+std::map<std::string, std::string> SessionWrapper::connections()
+{
+    if (mConnector)
+    {
+	return mConnector->connections();
+    }
+
+    return std::map<std::string, std::string>();
+}
+
 void SessionWrapper::setup()
 {
     mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 9ddb43c..0bdef83 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -81,6 +81,16 @@ public:
     AsteriskSCF::SessionCommunications::V1::SessionPrx getSession() const;
 
     /**
+     * Accesses the session controller proxy that the session gave us.
+     */
+    AsteriskSCF::SessionCommunications::V1::SessionControllerPrx getSessionController() const;
+
+    /**
+     * Sets the session controller used to control this session.
+     */
+    void setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx&);
+
+    /**
      * Sets the media connector object. This updates the media pairings associated
      * with this session.
      * Initiates replication.
@@ -114,6 +124,8 @@ public:
 
     std::string id();
 
+    std::map<std::string, std::string> connections();
+
     //
     // Large scale macro operations. In effect the wrapper becomes a sub-component within the bridge.
     //
@@ -144,6 +156,7 @@ private:
     std::string mId;
     MediaSplicerPtr mSplicer;
     ExecutorPtr mActivities;
+    AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mSessionController;
 
     /**
      * Sends changes to the replication service. This should never occur

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list