[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "transfer" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Oct 13 06:58:22 CDT 2010


branch "transfer" has been updated
       via  c0f4738d47d5cc17c7c37b9a7b4f2ac24cf5d6a3 (commit)
      from  20f7a25fede8398a030a9871bd93634822b3f2f6 (commit)

Summary of changes:
 slice              |    2 +-
 src/BridgeImpl.cpp |  119 ++++++++++++++++++++++++++++++++++-----------------
 src/BridgeImpl.h   |    4 +-
 3 files changed, 82 insertions(+), 43 deletions(-)


- Log -----------------------------------------------------------------
commit c0f4738d47d5cc17c7c37b9a7b4f2ac24cf5d6a3
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Oct 13 09:27:28 2010 -0230

    Updating bridge implementation to coincide with recent slice changes.

diff --git a/slice b/slice
index 7e63131..b8ee0db 160000
--- a/slice
+++ b/slice
@@ -1 +1 @@
-Subproject commit 7e63131ea251461c0f4a4d751841601b107452bd
+Subproject commit b8ee0dbd90b5d9425b5add8f9c8e147bf5be0d59
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 1a0d47d..ed5646f 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -30,6 +30,23 @@ using namespace AsteriskSCF::System::Logging;
 namespace
 {
 Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
+
+void checkSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+    Ice::LongSeq invalidIndexes; 
+    Ice::Long index = 0;
+    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
+    {
+        if(*i == 0)
+        {
+            invalidIndexes.push_back(index);
+        }
+    }
+    if(invalidIndexes.size() > 0)
+    {
+        throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(invalidIndexes);
+    }
+}
 }
 
 //
@@ -273,19 +290,7 @@ void AsteriskSCF::BridgeService::BridgeImpl::addSessions(const AsteriskSCF::Sess
     {
         return;
     }
-    unsigned long index = 0;
-    Ice::LongSeq badIndexes;
-    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
-    {
-        if(*i == 0)
-        {
-            badIndexes.push_back(index);
-        }
-    }
-    if(badIndexes.size() > 0)
-    {
-        throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(badIndexes);
-    }
+    checkSessions(sessions);
     AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -332,19 +337,7 @@ void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(const AsteriskSCF::S
     {
         return;
     }
-    unsigned long index = 0;
-    Ice::LongSeq badIndexes;
-    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
-    {
-        if(*i == 0)
-        {
-            badIndexes.push_back(index);
-        }
-    }
-    if(badIndexes.size() > 0)
-    {
-        throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(badIndexes);
-    }
+    checkSessions(sessions);
     AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -444,12 +437,13 @@ void AsteriskSCF::BridgeService::BridgeImpl::removeListener(const AsteriskSCF::S
     mListeners->removeListener(listener);
 }
 
-void AsteriskSCF::BridgeService::BridgeImpl::replaceSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& newSession, 
-  const AsteriskSCF::SessionCommunications::V1::SessionPrx& oldSession, const Ice::Current&)
+void AsteriskSCF::BridgeService::BridgeImpl::replaceSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& oldSession, 
+  const AsteriskSCF::SessionCommunications::V1::SessionSeq& newSessions, const Ice::Current&)
 {
     BridgeSessionPtr toRemove;
-    BridgeSessionPtr newBridgeMember;
-    AsteriskSCF::SessionCommunications::V1::SessionInfoPtr  info;
+    std::vector<BridgeSessionPtr> newMembers;
+
+    checkSessions(newSessions);
     {
         //
         // TODO:Need to find a way to make this as exception safe as possible!
@@ -457,26 +451,70 @@ void AsteriskSCF::BridgeService::BridgeImpl::replaceSession(const AsteriskSCF::S
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         statePreCheck();
 
-        info = newSession->setBridge(mPrx, mSessionListenerPrx);
 
         std::vector<BridgeSessionPtr>::iterator i = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(oldSession));
+
         //
-        // If it's found, disconnect the media and remove association between bridge and session.
+        // We need to disconnect the media while the bridge is locked because the media splicer does not currently
+        // have any mutex protection of its own.
         //
-        newBridgeMember = new BridgeSession(newSession, 0, false);
         if(i != mSessions.end())
         {
             toRemove = *i;
-            *i = newBridgeMember;
             toRemove->disconnect();
+            mSessions.erase(i);
+        }
+
+        for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
+        {
+            newMembers.push_back(new BridgeSession(*i, 0, false));
+        }
+        //
+        // We don't do anything else with the new members just yet. We are going to release the lock and continue with things.
+        //
+    }
+    std::vector<AsteriskSCF::SessionCommunications::V1::SessionInfoPtr> infoSeq;
+    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
+    {
+        try
+        {
+            infoSeq.push_back((*i)->setBridge(mPrx, mSessionListenerPrx));
+            newMembers.push_back(new BridgeSession(*i, 0, false));
+            mSessions.push_back(newMembers.back());
+        }
+        catch(const Ice::Exception&)
+        {
+            //
+            // We need to continue if setBridge fails for some reason. Rolling
+            // back the other sessions would be difficult. The bridge does not
+            // know enough to try for system wide consistency. An OTS would
+            // really be required if things like replaceSessions() were to be
+            // atomic.
+            //
         }
-        else
+    }
+    assert(infoSeq.size() == newMembers.size());
+
+    {
+        //
+        // Now we lock again to connect media where necessary.
+        //
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        std::vector<AsteriskSCF::SessionCommunications::V1::SessionInfoPtr>::iterator currentInfo = infoSeq.begin();
+        for(std::vector<BridgeSessionPtr>::iterator i = newMembers.begin(); i != newMembers.end() ; ++i, ++currentInfo)
         {
-            mSessions.push_back(newBridgeMember); 
+            assert(currentInfo != infoSeq.end());
+            if((*currentInfo)->currentState != "ready")
+            {
+                (*i)->setConnector(mSplicer.connect((*i)->getSession()));
+            }
         }
-        newBridgeMember->setConnector(mSplicer.connect(newSession));
     }
 
+    //
+    // The call on the session to disassociate it from the bridge is deferred until after the lock on the bridge has
+    // been released.
+    //
     if(toRemove)
     {
         try
@@ -491,10 +529,11 @@ void AsteriskSCF::BridgeService::BridgeImpl::replaceSession(const AsteriskSCF::S
         }
     }
 
-    AsteriskSCF::SessionCommunications::V1::SessionSeq sessions;
-    sessions.push_back(newSession);
+    //
+    // Now update the listeners.
+    //
+    AsteriskSCF::SessionCommunications::V1::SessionSeq sessions(newSessions);
     mListeners->sessionsAdded(sessions);
-
     if(toRemove)
     {
         sessions.clear();
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 0f7854a..76aae01 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -104,8 +104,8 @@ namespace BridgeService
         void addListener(const AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx& listener, const Ice::Current& current);
         void removeListener(const AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx& listener, const Ice::Current& current);
 
-        void replaceSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& newSession,
-          const AsteriskSCF::SessionCommunications::V1::SessionPrx& oldSession, const Ice::Current& current);
+        void replaceSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& ,
+          const AsteriskSCF::SessionCommunications::V1::SessionSeq& newSessions, const Ice::Current& current);
 
         //
         // Internal methods

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list