[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "sessioncontroller" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Jul 18 18:58:20 CDT 2011


branch "sessioncontroller" has been created
        at  efb149a5e3937f8db08d9e057286f7a99be8fca1 (commit)

- Log -----------------------------------------------------------------
commit efb149a5e3937f8db08d9e057286f7a99be8fca1
Author: Joshua Colp <jcolp at digium.com>
Date:   Mon Jul 18 20:59:27 2011 -0300

    Add streams support for media splicing.

diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 4b404cb..d352fbb 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -107,7 +107,7 @@ class SessionPairing extends ReplicatedStateItem
 {
     string bridgeKey;
     string sessionKey;
-    AsteriskSCF::Media::V1::Session* mediaSession;
+    AsteriskSCF::Media::V1::StreamInformationDict streams;
     MediaPairingSeq incomingMediaPairings;
     MediaPairingSeq outgoingMediaPairings;
 };
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index e4992fb..6263e71 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -68,9 +68,7 @@ typedef vector<IncomingPairing> IncomingPairings;
 struct MediaConnectorBuilder : public IceUtil::Shared
 {
     AsteriskSCF::SessionCommunications::V1::SessionPrx sessionPrx;
-    AsteriskSCF::Media::V1::SessionPrx mediaSession;
-    AsteriskSCF::Media::V1::StreamSourceSeq sources;
-    AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+    AsteriskSCF::Media::V1::StreamInformationDict streams;
     MediaConnectorIPtr peer;
     MediaConnectorIPtr connector;
     OutgoingPairings outgoingPairings;
@@ -91,7 +89,7 @@ public:
             const std::string& sessionId,
             const ReplicatorSmartPrx& replicator,
             const Logger& logger) :
-        mMedia(data->mediaSession),
+        mStreams(data->streams),
         mOutgoing(data->outgoingPairings),
         mIncoming(data->incomingPairings),
         mPeer(data->peer), 
@@ -111,7 +109,7 @@ public:
             const MediaConnectorPtr& peer,
             const ReplicatorSmartPrx& replicator,
             const Logger& logger) :
-        mMedia(pairing->mediaSession),
+        mStreams(pairing->streams),
         mPeer(peer),
         mConnected(true),
         mBridgeId(pairing->bridgeKey),
@@ -184,7 +182,7 @@ public:
         if (mReplicator)
         {
             SessionPairingPtr currentState(new SessionPairing);
-            currentState->mediaSession = mMedia;
+            currentState->streams = mStreams;
             currentState->bridgeKey = mBridgeId;
             currentState->sessionKey = mSessionId;
             currentState->key = mKey;
@@ -230,9 +228,13 @@ public:
     void destroy()
     {
         SessionPairingPtr newState;
+	vector<OutgoingPairing> outgoing;
+	vector<IncomingPairing> incoming;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
+	    outgoing = mOutgoing;
             mOutgoing.clear();
+	    incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
@@ -247,37 +249,32 @@ public:
         // it's media session constituents to stop communicating with anything
         // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        try
+	for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+             pairing != outgoing.end();
+             ++pairing)
         {
-            StreamSourceSeq sources = mMedia->getSources();
-            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+            try
             {
-                try
-                {
-//                    (*i)->setSink(StreamSinkPrx());
-// TODO: call removeSink
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                pairing->first->setSource(StreamSourcePrx());
             }
-            StreamSinkSeq sinks = mMedia->getSinks();
-            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+            catch (const Ice::Exception& ex)
             {
-                try
-                {
-                    (*i)->setSource(StreamSourcePrx());
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
             }
         }
-        catch (const Ice::Exception& ex)
+
+        for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+             pairing != incoming.end();
+             ++pairing)
         {
-            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            try
+            {
+                pairing->first->removeSink(pairing->second);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
         }
     }
 
@@ -301,9 +298,13 @@ public:
     {
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
         SessionPairingPtr newState;
+        vector<OutgoingPairing> outgoing;
+        vector<IncomingPairing> incoming;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
+            outgoing = mOutgoing;
             mOutgoing.clear();
+            incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
@@ -318,37 +319,32 @@ public:
         // it's media session constituents to stop communicating with anything
         // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        try
+        for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+             pairing != outgoing.end();
+             ++pairing)
         {
-            StreamSourceSeq sources = mMedia->getSources();
-            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+            try
             {
-                try
-                {
-//                    (*i)->setSink(StreamSinkPrx());
-// TODO: Call removeSink
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                pairing->first->setSource(StreamSourcePrx());
             }
-            StreamSinkSeq sinks = mMedia->getSinks();
-            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+            catch (const Ice::Exception& ex)
             {
-                try
-                {
-                    (*i)->setSource(StreamSourcePrx());
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-                }
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
             }
         }
-        catch (const Ice::Exception& ex)
+
+        for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+             pairing != incoming.end();
+             ++pairing)
         {
-            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            try
+            {
+                pairing->first->removeSink(pairing->second);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
         }
 
         return true;
@@ -361,9 +357,9 @@ public:
 
 private:
     boost::shared_mutex mLock;
-    AsteriskSCF::Media::V1::SessionPrx mMedia;
     vector<OutgoingPairing> mOutgoing;
     vector<IncomingPairing> mIncoming;
+    StreamInformationDict mStreams;
     MediaConnectorPtr mPeer;
     bool mConnected;
     string mBridgeId;
@@ -408,7 +404,7 @@ public:
         }
         connector->initialUpdate();
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+        mSessions.push_back(MediaSessionStruct(data->streams, connector));
         return connector;
     }
 
@@ -479,106 +475,64 @@ public:
             existing->clearConnections();
         }
         result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
-        mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+        mSessions.push_back(MediaSessionStruct(pairings->streams, result));
         return result;
     }
 
-    vector<OutgoingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSinkSeq& sinks)
+    void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+                                vector<IncomingPairing>& incoming)
     {
-        vector<StreamSourceSeq> allSources;
-
+        // If no streams are present we can not establish any pairings
+        if (streams.empty())
         {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            //
-            // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
-            // will come into a mix and go out.
-            //
-            for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
-            {
-                if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
-                {
-                    continue;
-                }
-                allSources.push_back(i->mediaSession->getSources());
-            }
+            return;
         }
 
-        vector<OutgoingPairing> result;
-        for (StreamSinkSeq::const_iterator sink = sinks.begin(); sink != sinks.end(); ++sink)
-        {
-            bool connected = false;
-            for (vector<StreamSourceSeq>::iterator sources = allSources.begin(); sources != allSources.end() && !connected; 
-                ++sources)
-            {
-                for (StreamSourceSeq::iterator source = sources->begin(); source != sources->end(); ++source)
-                {
-                    if (canAdapt(*sink, *source))
-                    {
-                        result.push_back(OutgoingPairing(*sink, *source));
-                        sources->erase(source);
-                        connected = true;
-                        break;
-                    }
-                }
-            }
-            if (!connected)
-            {
-                //
-                // TODO: We couldn't establish a proper connection for this sink. What happens
-                // here is probably a policy thing.
-                //
-            }
-        }
-        return result;
-    }
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
 
-    vector<IncomingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSourceSeq& sources)
-    {
-        vector<StreamSinkSeq> allSinks;
+        for (MediaSessions::const_iterator i = mSessions.begin(); i != mSessions.end(); ++i)
         {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            //
-            // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
-            // will come into a mix and go out.
-            //
-            for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+            // If no streams are present go ahead and skip it
+            if (i->streams.empty())
             {
-                if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
-                {
-                    continue;
-                }
-                allSinks.push_back(i->mediaSession->getSinks());
+                continue;
             }
-        }
 
-        vector<IncomingPairing> result;
-        for (StreamSourceSeq::const_iterator source = sources.begin(); source != sources.end(); ++source)
-        {
-            bool connected = false;
-            for (vector<StreamSinkSeq>::iterator sinks = allSinks.begin(); sinks != allSinks.end() && !connected; ++sinks)
+            // Create a copy so we can remove streams once pairings have been established for them
+            StreamInformationDict theirStreams = i->streams;
+
+            // Iterate our own streams looking for any that can be connected to this session
+            for (StreamInformationDict::const_iterator stream = streams.begin();
+                 stream != streams.end();
+                 ++stream)
             {
-                for (StreamSinkSeq::iterator sink = sinks->begin(); sink != sinks->end(); ++sink)
+                for (StreamInformationDict::const_iterator theirStream = theirStreams.begin();
+                     theirStream != theirStreams.end();
+                     ++theirStream)
                 {
-                    if (canAdapt(*source, *sink))
+                    // If this is us skip all streams
+                    if (stream->second->sinks.front() == theirStream->second->sinks.front())
                     {
-                        result.push_back(IncomingPairing(*source, *sink));
-                        sink = sinks->erase(sink);
-                        connected = true;
                         break;
                     }
+                    
+                    // Is this stream compatible with the other stream?
+                    if (haveCommonFormats(stream->second, theirStream->second) == false)
+                    {
+                        continue;
+                    }
+
+		    // Connect sources and sinks together, right now we only support the first
+		    outgoing.push_back(OutgoingPairing(stream->second->sinks.front(), theirStream->second->sources.front()));
+		    incoming.push_back(IncomingPairing(stream->second->sources.front(), theirStream->second->sinks.front()));
+
+                    theirStreams.erase(theirStream->first);
+                    break;
                 }
             }
-            if (!connected)
-            {
-                //
-                // TODO: We couldn't establish a proper connection for this source! What happens now
-                // should be a matter of policy.
-                //
-            }
         }
-        return result;
     }
-    
+
 private:
 
     boost::shared_mutex mLock;
@@ -589,11 +543,11 @@ private:
 
     struct MediaSessionStruct
     {
-        SessionPrx mediaSession;
+        StreamInformationDict streams;
         MediaConnectorIPtr connector;
 
-        MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
-            mediaSession(session),
+        MediaSessionStruct(const StreamInformationDict& strms, const MediaConnectorIPtr& con) :
+            streams(strms),
             connector(con)
         {
         }
@@ -603,15 +557,6 @@ private:
 
     MediaSessions mSessions;
 
-    bool needMixing()
-    {
-        //
-        // TODO: We are only supporting single stream with two endpoints for the moment, so no mixing will
-        // be required.
-        //
-        return false;
-    }
-
     bool hasPreferredCodecOverride()
     {
         //
@@ -622,93 +567,50 @@ private:
         return false;
     }
 
-    bool canAdapt(const Media::V1::StreamSourcePrx&, const Media::V1::StreamSinkPrx&)
-    {
-        //
-        // For the moment we are assuming two streams that can definitely talk to each other.
-        //
-        return true;
-    }
-
-    bool canAdapt(const Media::V1::StreamSinkPrx&, const Media::V1::StreamSourcePrx&)
+    bool canAdapt(const StreamInformationPtr&, const StreamInformationPtr&)
     {
-        //
-        // For the moment we are assuming two streams that can definitely talk to each other.
-        //
-        return true;
-    }
-};
-
-class GetMediaSession : public QueuedTask
-{
-public:
-    GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetMediaSession"),
-        mSession(session),
-        mMaterials(materials)
-    {
-    }
-
-protected:
-    bool executeImpl()
-    {
-        mMaterials->sessionPrx->begin_getMediaSession(
-            AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
-                        &GetMediaSession::done, &GetMediaSession::failed));
+        // For the moment we can not transcode
         return false;
     }
 
-    void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+    bool haveCommonFormats(const StreamInformationPtr& stream0, const StreamInformationPtr& stream1)
     {
-        mMaterials->mediaSession = mediaSession;
-        mListener->succeeded();
-    }
+	for (FormatSeq::const_iterator format0 = stream0->formats.begin();
+             format0 != stream0->formats.end();
+             ++format0)
+        {
+            for (FormatSeq::const_iterator format1 = stream1->formats.begin();
+                 format1 != stream1->formats.end();
+                 ++format1)
+            {
+                // Is the name alike?
+                if ((*format0)->name != (*format1)->name)
+                {
+                    continue;
+                }
 
-    void failed(const Ice::Exception&)
-    {
-        mListener->failed();
-    }
-private:
-    SessionWrapperPtr mSession;
-    MediaConnectorBuilderPtr mMaterials;
-};
+                // Is a format operations service available, and if so are they compatible?
+                if ((*format0)->operations &&
+                    (((*format0)->operations != (*format1)->operations ||
+                      ((*format0)->operations->checkCompatible((*format0), (*format1)) == false))))
+                {
+                    continue;
+                }
 
-class GetSinks : public QueuedTask
-{
-public:
-    GetSinks(const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetSinks"),
-        mMaterials(materials)
-    {
-    }
+                return true;
+            }
+        }
 
-protected:
-    bool executeImpl()
-    {
-        mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
-                        &GetSinks::done, &GetSinks::failed));
         return false;
     }
-
-    void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
-    {
-        mMaterials->sinks = sinks;
-        mListener->succeeded();
-    }
-
-    void failed(const Ice::Exception&)
-    {
-        mListener->failed();
-    }
-private:
-    MediaConnectorBuilderPtr mMaterials;
 };
 
-class GetSources : public QueuedTask
+class GetStreams : public QueuedTask
 {
 public:
-    GetSources(const MediaConnectorBuilderPtr& materials) :
-        QueuedTask("GetSources"),
+    GetStreams(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetStreams"),
+        mSession(session),
         mMaterials(materials)
     {
     }
@@ -716,14 +618,15 @@ public:
 protected:
     bool executeImpl()
     {
-        mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
-                        &GetSources::done, &GetSources::failed));
+        mMaterials->sessionPrx->begin_getStreams(
+            AsteriskSCF::SessionCommunications::V1::newCallback_Session_getStreams(this,
+                        &GetStreams::done, &GetStreams::failed));
         return false;
     }
 
-    void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+    void done(const AsteriskSCF::Media::V1::StreamInformationDict& streams)
     {
-        mMaterials->sources = sources;
+        mMaterials->streams = streams;
         mListener->succeeded();
     }
 
@@ -731,8 +634,8 @@ protected:
     {
         mListener->failed();
     }
-    
 private:
+    SessionWrapperPtr mSession;
     MediaConnectorBuilderPtr mMaterials;
 };
 
@@ -749,8 +652,7 @@ public:
 protected:
     bool executeImpl()
     {
-        mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sources);
-        mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sinks);
+	mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings);
         return true;
     }
 
@@ -995,9 +897,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
     MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
     materials->peer = peer;
     materials->sessionPrx = sessionPrx;
-    tasks.push_back(new GetMediaSession(session, materials));
-    tasks.push_back(new GetSinks(materials));
-    tasks.push_back(new GetSources(materials));
+    tasks.push_back(new GetStreams(session, materials));
     tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
     tasks.push_back(new GetCompatiblePairings(splicer, materials));
     tasks.push_back(new MakeConnections(materials));

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list