[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Jun 6 09:58:52 CDT 2011


branch "master" has been updated
       via  6595950f2059c14dc1dbc40f3ba76d4a27d72fd5 (commit)
      from  b1411d90beaf59d710f87311c0471b57c29ab71f (commit)

Summary of changes:
 src/MediaSplicer.cpp    |  176 +++++++++++++++-------------------------------
 src/SessionListener.cpp |   11 +++-
 2 files changed, 68 insertions(+), 119 deletions(-)


- Log -----------------------------------------------------------------
commit 6595950f2059c14dc1dbc40f3ba76d4a27d72fd5
Author: Brent Eagles <beagles at digium.com>
Date:   Mon Jun 6 12:17:40 2011 -0230

    Simplify disconnection of the media splicing. To avoid race conditions, a
    disconnection should not affect any of the other media sessions. The other
    media sessions will be progressively altered either by internal bridge
    operations or by the owning session itself.
    
    Modified the listener to included a proactive reap to clean out disconnected
    sessions.

diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 7812b64..1a631b6 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -133,36 +133,7 @@ public:
     void unplug()
     {
         mLogger(Debug) << FUNLOG << ": called.";
-        bool hadMedia = disconnectMedia();
-        if (!hadMedia)
-        {
-            mLogger(Debug) << FUNLOG << ": we did not have any media. Contacting the peer connector and telling it to disconnect!";
-            MediaConnectorPtr peer;
-            {
-                boost::unique_lock<boost::shared_mutex> lock(mLock);
-                peer = mPeer;
-                mPeer = 0;
-            }
-            if (peer)
-            {
-                peer->disconnectMedia();
-            }
-        }
-        else
-        {
-            mLogger(Debug) << FUNLOG << ": media connections unplugged. Let's forget about our peer connector now!";
-            MediaConnectorPtr peer;
-            {
-                boost::unique_lock<boost::shared_mutex> lock(mLock);
-                peer = mPeer;
-                mPeer = 0;
-            }
-            if (peer)
-            {
-                peer->clearConnections();
-            }
-            mPeer = 0;
-        }
+        disconnectMedia();
         mLogger(Debug) << FUNLOG << ": finished unplugging.";
         if (mReplicator)
         {
@@ -258,71 +229,54 @@ public:
     //
     void destroy()
     {
-        vector<OutgoingPairing> outgoing;
-        vector<IncomingPairing> incoming;
         SessionPairingPtr newState;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
-            outgoing = mOutgoing;
             mOutgoing.clear();
-            incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
         }
         pushUpdate(newState);
-        if (outgoing.size() == 0 && incoming.size() == 0)
-        {
-            return;
-        }
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources";
 
         //
-        // Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot
-        // for oneways. We don't care about errors when destroying. We still catch Ice::Exceptions though since
-        // a connection might be attempted, even on behalf of a oneway.
+        // NOTE: we used to talk to the peer and disconnect it too, but really
+        // that is fraught with danger when using non-synchronous or non-serial
+        // RPCs. We take the safe road and allow each connector to simply tell
+        // it's media session constituents to stop communicating with anything
+        // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
+        try
         {
-            mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and "
-                           << i->first->ice_toString();
-
-            try
+            StreamSourceSeq sources = mMedia->getSources();
+            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
             {
-                tryOneWay(i->second)->setSink(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
-            try
-            {
-                tryOneWay(i->first)->setSource(0);
+                try
+                {
+                    (*i)->setSink(StreamSinkPrx());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                }
             }
-            catch (const Ice::Exception& ex)
+            StreamSinkSeq sinks = mMedia->getSinks();
+            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                try
+                {
+                    (*i)->setSource(StreamSourcePrx());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                }
             }
         }
-        for (vector<IncomingPairing>::iterator i = incoming.begin(); i != incoming.end(); ++i)
+        catch (const Ice::Exception& ex)
         {
-            mLogger(Debug) << FUNLOG << ": disconnecting " << i->first->ice_toString() << " and " << i->second->ice_toString();
-            try
-            {
-                tryOneWay(i->first)->setSink(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
-            try
-            {
-                tryOneWay(i->second)->setSource(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
+            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
         }
     }
 
@@ -345,70 +299,56 @@ public:
     bool disconnectMedia()
     {
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
-        vector<OutgoingPairing> outgoing;
-        vector<IncomingPairing> incoming;
         SessionPairingPtr newState;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
-            outgoing = mOutgoing;
             mOutgoing.clear();
-            incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
             newState = createUpdate();
         }
         pushUpdate(newState);
-        if (outgoing.size() == 0 && incoming.size() == 0)
-        {
-            return false;
-        }
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources";
 
         //
-        // TODO: Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot for
-        // oneways or, at the very least, AMI.
+        // NOTE: we used to talk to the peer and disconnect it too, but really
+        // that is fraught with danger when using non-synchronous or non-serial
+        // RPCs. We take the safe road and allow each connector to simply tell
+        // it's media session constituents to stop communicating with anything
+        // else. This also used to try one-ways, that's probably a bad idea too.
         //
-        for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
+        try
         {
-            mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and " << i->first->ice_toString();
-
-            try
-            {
-                i->second->setSink(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
-            try
+            StreamSourceSeq sources = mMedia->getSources();
+            for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
             {
-                i->first->setSource(0);
+                try
+                {
+                    (*i)->setSink(StreamSinkPrx());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                }
             }
-            catch (const Ice::Exception& ex)
+            StreamSinkSeq sinks = mMedia->getSinks();
+            for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
             {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                try
+                {
+                    (*i)->setSource(StreamSourcePrx());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+                }
             }
         }
-        for (vector<IncomingPairing>::iterator i = incoming.begin(); i != incoming.end(); ++i)
+        catch (const Ice::Exception& ex)
         {
-            mLogger(Debug) << FUNLOG << ": disconnecting " << i->first->ice_toString() << " and " << i->second->ice_toString();
-            try
-            {
-                i->first->setSink(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
-            try
-            {
-                i->second->setSource(0);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
-            }
+            mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
         }
+
         return true;
     }
 
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 71c6d4d..bf5b450 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -183,6 +183,16 @@ public:
             // A null response code pointer will prevent the stop indication from being sent back to this session.
 	    //
 	    session->shutdown(mListenerPrx, ResponseCodePtr());
+            mSessions->removeSession(session->getBridgedSession());
+
+            //
+            // It's a good idea to reap at this time to avoid having processes
+            // that might be disconnected from being considered in the
+            // visitSessions() run.  TODO: Some operations shouldn't be
+            // performed on sessions in certain states. It's really the
+            // operation's responsibility to prevent that from happening.
+            //
+            mSessions->reap();
 
             ShutdownSessionOperation shutdownOp(session->getSession(), mListenerPrx, stopped->response, mLogger);
             mSessions->visitSessions(shutdownOp);
@@ -195,7 +205,6 @@ public:
             {
                 try
                 {
-                    mSessions->reap();
                     SessionSeq currentSessions = mSessions->getSessionSeq();
 
                     //

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list