[asterisk-scf-commits] asterisk-scf/release/routing.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Nov 29 17:02:45 CST 2010


branch "master" has been updated
       via  e971cf16fcd669f6fbec5b48ff2c36aca9e1ad6a (commit)
       via  34ce99c2d1e10d40be12c6066a30a6b0b6ad504e (commit)
      from  9aad91812cb36d4f1f4fc3cc3880b6d1f1756706 (commit)

Summary of changes:
 src/SessionRouter.cpp |  511 ++++++++++++++++++++++++++----------------------
 1 files changed, 277 insertions(+), 234 deletions(-)


- Log -----------------------------------------------------------------
commit e971cf16fcd669f6fbec5b48ff2c36aca9e1ad6a
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Nov 29 16:11:48 2010 -0600

    Addressed review comments from review: CR-ASTSCF-6

diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 1afe420..ce6a266 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -57,9 +57,10 @@ public:
      */
     RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
         mMaxRetries(maxRetries),
-        mRetryInterval(intervalInMilliseconds),
+        mRetryIntervalMilliseconds(intervalInMilliseconds),
         mCounter(0)
     {
+        assert(maxRetries < 0xffff);
     }
 
     /**
@@ -67,19 +68,26 @@ public:
      */
     bool canRetry()
     {
-        return mCounter < mMaxRetries;
+        return mCounter <= mMaxRetries;
     }
 
     /**
-     * Using must call this for each attempt. Applies the delay between calls and does
+     * User must call this after each failed attempt. Applies the delay between calls and does
      * bookkeeping.
      */
     bool retry()
     {
-        lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
-        IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
         ++mCounter;
-        return canRetry();
+        lg(Debug) << "Retrying for the " << mCounter << " time.";
+
+        bool doRetry = canRetry();
+
+        if (doRetry)
+        {
+            IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryIntervalMilliseconds));
+        }
+
+        return doRetry;
     }
 
     /**
@@ -92,7 +100,7 @@ public:
 
 private:
     size_t mMaxRetries;
-    size_t mRetryInterval;
+    size_t mRetryIntervalMilliseconds;
     size_t mCounter;
 };
 
@@ -156,7 +164,7 @@ public: // The following operations are implementations of the SessionListener i
             }
             catch(const Ice::Exception &e)
             {
-                lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what() << std::endl;
+                lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
             }
         }
 
@@ -198,7 +206,7 @@ public:
             }
             catch(const Ice::Exception &e)
             {
-                lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what() << std::endl;
+                lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
             }
         }
     }
@@ -225,18 +233,18 @@ public:
             sessionsToCall = mSessions;
         }
 
-        lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from." << std::endl ;
+        lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
 
         for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
         {
             try
             {
-                lg(Debug) << "Removing listener from session " << (*s)->ice_toString() << std::endl ;
+                lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
                 (*s)->removeListener(mListenerPrx);
             }
             catch(const Ice::Exception &e)
             {
-                lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what() << std::endl;
+                lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
             }
         }
 
@@ -279,10 +287,24 @@ public:
           mAdapter(adapter)
     {
         Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-        SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
-        mSessionListener->setProxy(listenerProxy);
 
-        mSessionListener->addSessionAndListen(session);
+        try
+        {
+            SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+            mSessionListener->setProxy(listenerProxy);
+
+            mSessionListener->addSessionAndListen(session);
+        }
+        catch(...)
+        {
+            try
+            {
+               mAdapter->remove(prx->ice_getIdentity());
+            }
+            catch(...)
+            {
+            }
+        }
     }
 
     SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
@@ -290,12 +312,26 @@ public:
           mAdapter(adapter)
     {
         Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-        SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
-        mSessionListener->setProxy(listenerProxy);
 
-        for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+        try
         {
-            mSessionListener->addSessionAndListen(*s);
+            SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+            mSessionListener->setProxy(listenerProxy);
+
+            for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+            {
+                mSessionListener->addSessionAndListen(*s);
+            }
+        }
+        catch(...)
+        {
+            try
+            {
+               mAdapter->remove(prx->ice_getIdentity());
+            }
+            catch(...)
+            {
+            }
         }
     }
 
@@ -306,13 +342,13 @@ public:
         // in proper order.
         try
         {
-            lg(Debug) << "About to unregister the listener..." << std::endl ;
+            lg(Debug) << "About to unregister the listener..." ;
 
             mSessionListener->unregister();
         }
         catch(const std::exception& e)
         {
-            lg(Debug) << "Exception unregistering: " << e.what() << std::endl ;
+            lg(Debug) << "Exception unregistering: " << e.what() ;
         }
 
         try
@@ -324,7 +360,7 @@ public:
         }
         catch(const std::exception& e)
         {
-            lg(Debug) << "Exception removing listener from Object Adatper " << e.what() << std::endl ;
+            lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
         }
     }
 
@@ -408,7 +444,7 @@ public:
             }
             catch (const Ice::Exception &e)
             {
-                lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what() << std::endl;
+                lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
                 // TBD... probably other bridge cleanup needs to be done.
                 throw;
             }
@@ -434,18 +470,18 @@ public:
             {
                 if(!policy.retry())
                 {
-                    lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed "  << policy.maxRetries() << " retries." << std::endl;
+                    lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed "  << policy.maxRetries() << " retries." ;
                     throw;
                 }
             }
             catch(const NotBridged& e)
             {
-                lg(Error) << "getBridge(): session is not bridged."  << std::endl;
+                lg(Error) << "getBridge(): session is not bridged." ;
                 throw e; // rethrow
             }
             catch(const Ice::Exception& e)
             {
-                lg(Error) << "getBridge(): Ice exception getting bridge for session:"  << e.what() << std::endl;
+                lg(Error) << "getBridge(): Ice exception getting bridge for session:"  << e.what();
                 throw e; // rethrow
             }
         }
@@ -454,23 +490,23 @@ public:
     }
 
     /**
-     * Add a session to each of a given set of endpoints, and return a collection of the 
+     * Create a session to each of a given set of endpoints, and return a collection of the 
      * newly added sessions. 
      */
-    SessionSeq addSessionToEndpoints(EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+    SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
     {
         // Add a session
         SessionSeq newSessions;
-        for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
+        for (EndpointSeq::const_iterator e = endpoints.begin(); e != endpoints.end(); ++e)
         {
             try
             {
                 SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
 
                 // Create a session on the destination.
-                lg(Debug) << "addSessionToEndpoints(): Creating a session at destination " << destination;
+                lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
                 SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
-                lg(Debug) << "  Session proxy: " << destSession->ice_toString() << std::endl;
+                lg(Debug) << "  Session proxy: " << destSession->ice_toString() ;
 
                 listener->addSession(destSession);
                 newSessions.push_back(destSession);
@@ -489,7 +525,7 @@ public:
      *   @bridge The bridge whose sessions are to be accessed.
      *   @except An optional session proxy to be excluded from the list of sessions. 
      */
-    SessionSeq getSessionsInBridge(BridgePrx bridge, SessionPrx except=0)
+    SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
     {
         SessionSeq sessions; 
         try
@@ -511,7 +547,7 @@ public:
         }
         catch(const Ice::Exception &e)
         {
-            lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what() << std::endl;
+            lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
             throw e; // rethrow
         }
         return sessions;
@@ -577,7 +613,7 @@ void SessionRouter::setBridgeManagerAccessor(const BridgeManagerAccessorPtr& bri
 void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const std::string& destination,
     const Ice::Current& current)
 {
-    lg(Debug) << "routeSession() entered with destination " << destination << std::endl;
+    lg(Debug) << "routeSession() entered with destination " << destination ;
 
     // Create a listener for the source to handle early termination.
     // The wrapper we're using will remove the listener and free it when
@@ -585,11 +621,11 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
     SessionListenerAllocator listener(mImpl->mAdapter, source);
 
     // Route the destination
-    lg(Debug) << "routeSession(): Routing destination " << destination << std::endl;
+    lg(Debug) << "routeSession(): Routing destination " << destination;
     EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
 
     // Add a session to the endpoints. 
-    SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
+    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
 
     if (listener->getNumSessions() < 2)
     {
@@ -620,7 +656,7 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
     }
     catch (const Ice::Exception &e)
     {
-        lg(Debug) << "routeSession(): Exception creating bridge: " << e.what() << std::endl;
+        lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
         listener->unregister();
 
         throw BridgingException(source->getEndpoint()->getId(), destination);
@@ -643,7 +679,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
     const ::std::string& destination,
     const Ice::Current& current)
 {
-    lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination << std::endl;
+    lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
 
     BridgePrx bridge(sessionToReplace->getBridge());
 
@@ -660,7 +696,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
     EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
 
     // Add a session 
-    SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
+    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
 
     if (listener->getNumSessions() < 2)
     {
@@ -712,7 +748,7 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
     const SessionPrx& bridgedSession,
     const Ice::Current&)
 {
-    lg(Debug) << "connectBridgedSessions() entered... " << std::endl;
+    lg(Debug) << "connectBridgedSessions() entered... ";
 
     // Get the bridge being merged into.
     BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);

commit 34ce99c2d1e10d40be12c6066a30a6b0b6ad504e
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Nov 18 18:18:40 2010 -0600

    Refactored redundant code into reusable methods. Cleanup from some
    late-night fixes leading up to AstriCon.

diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 9aec3aa..1afe420 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -43,16 +43,68 @@ namespace AsteriskSCF
 namespace BasicRoutingService
 {
 
-class SessionListenerImpl : public SessionListener
+/**
+ * A simple utility to make the retry process a little cleaner until a SmartProxy
+ * or other such mechanism has this functionality built-in. 
+ */
+class RetryPolicy
 {
 public:
-    SessionListenerImpl(Ice::ObjectAdapterPtr adapter, const SessionPrx& session) :
-        mAdapter(adapter), mTerminated(false), mListenerPrx(0)
+    /**
+     * Constructor:
+     *  @param maxRetries Maximum number of times to retry. 
+     *  @intervalInMilliseconds Will sleep this amount in the retry() method. 
+     */
+    RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
+        mMaxRetries(maxRetries),
+        mRetryInterval(intervalInMilliseconds),
+        mCounter(0)
+    {
+    }
+
+    /**
+     * Indicates whether additional retries are warrented. 
+     */
+    bool canRetry()
+    {
+        return mCounter < mMaxRetries;
+    }
+
+    /**
+     * Using must call this for each attempt. Applies the delay between calls and does
+     * bookkeeping.
+     */
+    bool retry()
     {
+        lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
+        IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
+        ++mCounter;
+        return canRetry();
     }
 
-    SessionListenerImpl(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence) :
-        mAdapter(adapter), mTerminated(false), mListenerPrx(0)
+    /**
+     * Accessor for the number of retries allowed. 
+     */
+    size_t maxRetries()
+    {
+        return mMaxRetries;
+    }
+
+private:
+    size_t mMaxRetries;
+    size_t mRetryInterval;
+    size_t mCounter;
+};
+
+/**
+ * Listener used to monitor sessions during the routing process. Primarily used to 
+ * insure the relevant sessions haven't stopped prior to being bridged. 
+ */
+class SessionListenerImpl : public SessionListener
+{
+public:
+    SessionListenerImpl() :
+        mTerminated(false), mListenerPrx(0)
     {
     }
 
@@ -60,6 +112,8 @@ public:
     {
     }
 
+public: // The following operations are implementations of the SessionListener interface.
+
     void connected(const SessionPrx& session, const Ice::Current&)
     {
     }
@@ -112,10 +166,12 @@ public:
     {
     }
 
+public: 
+
     /**
      * Adds a session to be tracked by the listener. This operation doesn't actually call addListener on the Session.
      * When we ask an endpoint to create a session, we pass our listener in to the creation process.
-     * So the listener has been attached to the session, but we just need to keep track of it in this object.
+     * So the listener has been attached to the session, but we want to keep track of it in this object.
      */
     void addSession(SessionPrx session)
     {
@@ -204,7 +260,6 @@ public:
 private:
     boost::shared_mutex mLock;
 
-    Ice::ObjectAdapterPtr mAdapter;
     SessionSeq mSessions;
     bool mTerminated;
     SessionListenerPrx mListenerPrx;
@@ -220,7 +275,7 @@ class SessionListenerAllocator
 {
 public:
     SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
-        : mSessionListener(new SessionListenerImpl(adapter, session)),
+        : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
         Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -231,7 +286,7 @@ public:
     }
 
     SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
-        : mSessionListener(new SessionListenerImpl(adapter, sessionSequence)),
+        : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
         Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -283,6 +338,9 @@ private:
     Ice::ObjectAdapterPtr mAdapter;
 };
 
+/**
+ * Private operations and state of the SessionRouter. 
+ */
 class SessionRouterPriv
 {
 public:
@@ -298,11 +356,17 @@ public:
     {
     }
 
+    /**
+     * Set the accessor to the bridge. 
+     */
     void setBridgeAccessor(BridgeManagerAccessorPtr bridgeAccessor)
     {
         mBridgeManagerAccessor = bridgeAccessor;
     }
 
+    /**
+     * Do a lookup of the requested endpoint. 
+     */
     EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
     {
         EndpointSeq endpoints;
@@ -331,6 +395,9 @@ public:
     }
 
 
+    /**
+     * Forward the start() operation to all sessions in a given sequence. 
+     */
     void forwardStart(SessionSeq& sessions)
     {
         for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s)
@@ -348,6 +415,138 @@ public:
         }
     }
 
+    /**
+     * Provide access to the bridge for a given session. 
+     */
+    BridgePrx getBridge(SessionPrx session)
+    {
+        BridgePrx result(0);
+
+        RetryPolicy policy(5, 500);
+        while(policy.canRetry())
+        {
+            try
+            {
+                result = session->getBridge();
+                break;
+            }
+            catch(const Ice::ConnectionLostException&)
+            {
+                if(!policy.retry())
+                {
+                    lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed "  << policy.maxRetries() << " retries." << std::endl;
+                    throw;
+                }
+            }
+            catch(const NotBridged& e)
+            {
+                lg(Error) << "getBridge(): session is not bridged."  << std::endl;
+                throw e; // rethrow
+            }
+            catch(const Ice::Exception& e)
+            {
+                lg(Error) << "getBridge(): Ice exception getting bridge for session:"  << e.what() << std::endl;
+                throw e; // rethrow
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Add a session to each of a given set of endpoints, and return a collection of the 
+     * newly added sessions. 
+     */
+    SessionSeq addSessionToEndpoints(EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+    {
+        // Add a session
+        SessionSeq newSessions;
+        for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
+        {
+            try
+            {
+                SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
+
+                // Create a session on the destination.
+                lg(Debug) << "addSessionToEndpoints(): Creating a session at destination " << destination;
+                SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+                lg(Debug) << "  Session proxy: " << destSession->ice_toString() << std::endl;
+
+                listener->addSession(destSession);
+                newSessions.push_back(destSession);
+            }
+            catch(const Ice::Exception &exception)
+            {
+                lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
+                // We may be able to reach SOME of the endpoints.
+            }
+        }
+        return newSessions;
+    }
+    
+    /**
+     * Accessor for the sessions in a bridge.
+     *   @bridge The bridge whose sessions are to be accessed.
+     *   @except An optional session proxy to be excluded from the list of sessions. 
+     */
+    SessionSeq getSessionsInBridge(BridgePrx bridge, SessionPrx except=0)
+    {
+        SessionSeq sessions; 
+        try
+        {
+            SessionSeq allSessions = bridge->listSessions();
+
+            if (except == 0)
+            {
+                return allSessions;
+            }
+
+            for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
+            {
+                if (except->ice_getIdentity() != (*s)->ice_getIdentity())
+                {
+                    sessions.push_back(*s);
+                }
+            }
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what() << std::endl;
+            throw e; // rethrow
+        }
+        return sessions;
+    }
+
+    /**
+     * Removes sessions from a bridge.
+     *   @param bridge The bridge whose sessions are to be removed.
+     *   @param except The only session to be left in the bridge. 
+     */
+    SessionSeq removeSessionsFromBridge(BridgePrx bridge, SessionPrx except)
+    {
+        SessionSeq removedSessions;
+        try
+        {
+            SessionSeq allSessions = bridge->listSessions();
+            for(SessionSeq::iterator s = allSessions.begin(); s != allSessions.end(); ++s)
+            {
+                if ((*s)->ice_getIdentity() != except->ice_getIdentity())
+                {
+                    removedSessions.push_back(*s);
+                }
+            }
+
+            lg(Debug) << "Removing sessions from bridge." ;
+            bridge->removeSessions(removedSessions);
+        }
+        catch(const Ice::Exception&)
+        {
+            lg(Warning) << "Unable to remove sessions. " ;
+            // We won't give up because of this.
+        }
+        return removedSessions;
+    }
+
 public:
     Ice::ObjectAdapterPtr mAdapter;
     EndpointRegistryPtr mEndpointRegistry;
@@ -389,28 +588,8 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
     lg(Debug) << "routeSession(): Routing destination " << destination << std::endl;
     EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
 
-    // Add a session
-    SessionSeq newSessions;
-    for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
-    {
-        try
-        {
-            SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
-
-            // Create a session on the destination.
-            lg(Debug) << "routeSession(): Creating a session at destination " << destination;
-            SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
-            lg(Debug) << "  Session proxy: " << destSession->ice_toString() << std::endl;
-
-            listener->addSession(destSession);
-            newSessions.push_back(destSession);
-        }
-        catch(const Ice::Exception &exception)
-        {
-            lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
-            // We may be able to reach SOME of the endpoints.
-        }
-    }
+    // Add a session to the endpoints. 
+    SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
 
     if (listener->getNumSessions() < 2)
     {
@@ -421,6 +600,7 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
     {
         throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
     }
+
     // We're through listening, and we will probably interfere with the Bridge's functionality if
     // we keep listening.
     listener->unregister();
@@ -465,60 +645,9 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
 {
     lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination << std::endl;
 
-    BridgePrx bridge(0);
-
-    size_t count = 0;
-    bool done = false;
-    size_t sleepInterval = 500; // XXX Make configurable.
-    size_t numberOfRetries = 5;
-    while(!done && count < numberOfRetries)
-    {
-        try
-        {
-            bridge = sessionToReplace->getBridge();
-            done = true;
-        }
-        catch(const Ice::ConnectionLostException&)
-        {
-            IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
-            ++count;
-            if(count >= numberOfRetries)
-            {
-                lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge, failed "  << numberOfRetries << " retries." << std::endl;
-                throw;
-            }
-        }
-        catch(const NotBridged& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): sessionToReplace() is not bridged."  << std::endl;
-            throw e; // rethrow
-        }
-        catch(const Ice::Exception& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge:"  << e.what() << std::endl;
-            throw e; // rethrow
-        }
-    }
-
-    SessionSeq seq;
-    try
-    {
-        seq = bridge->listSessions();
-    }
-    catch(const Ice::Exception &e)
-    {
-        lg(Error) << "Unable to get list of sesssions for bridge in connectBridgedSessionsWithDestination(). " ;
-        throw e; // rethrow
-    }
+    BridgePrx bridge(sessionToReplace->getBridge());
 
-    SessionSeq remainingSessions;
-    for(SessionSeq::iterator s = seq.begin(); s !=seq.end(); ++s)
-    {
-        if (sessionToReplace->ice_getIdentity() != (*s)->ice_getIdentity()) // Don't listen to the session being replaced.
-        {
-            remainingSessions.push_back(*s);
-        }
-    }
+    SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
 
     // Create a listener for the sessions not being replaced to handle early termination.
     // The wrapper we're using will remove the listener and free it when
@@ -530,27 +659,8 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
     lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
     EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
 
-    // Add a session
-    SessionSeq newSessions;
-    for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
-    {
-        try
-        {
-            SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
-
-            // Create a session on the destination.
-            SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
-            listener->addSession(destSession);
-            newSessions.push_back(destSession);
-
-            lg(Debug) << "connectBridgedSessionsWithDestination(): Created session for routed destination " << destination;
-        }
-        catch(const Ice::Exception &exception)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create sessionEndpoint for " << destination << ". " << exception.what();
-            // We may be able to reach SOME of the endpoints.
-        }
-    }
+    // Add a session 
+    SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
 
     if (listener->getNumSessions() < 2)
     {
@@ -584,6 +694,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
 
 } // SessionRouter::connectBridgedSessionsWithDestination(...)
 
+
 /**
  * Replace one session in a Bridge with sessions from another bridge.
  * No routing is actually performed. This operation exists here for consistency,
@@ -604,59 +715,9 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
     lg(Debug) << "connectBridgedSessions() entered... " << std::endl;
 
     // Get the bridge being merged into.
-    BridgePrx mergeBridge(0);
-
-    size_t count = 0;
-    bool done = false;
-    size_t sleepInterval = 500; // XXX Make configurable.
-    size_t numberOfRetries = 5;
-    while(!done && count < numberOfRetries)
-    {
-        try
-        {
-            mergeBridge = sessionToReplace->getBridge();
-            done = true;
-        }
-        catch(const Ice::ConnectionLostException&)
-        {
-            IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
-            ++count;
-            if(count >= numberOfRetries)
-            {
-                lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge for sessionToReplace, failed "  << numberOfRetries << " retries." << std::endl;
-                throw;
-            }
-        }
-        catch(const NotBridged& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): sessionToReplace() is not bridged."  << std::endl;
-            throw e; // rethrow
-        }
-        catch(const Ice::Exception& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge for sessionToReplace:"  << e.what() << std::endl;
-            throw e; // rethrow
-        }
-    }
-
-    SessionSeq preserveSessions;
-    try
-    {
-        SessionSeq sourceSessions = mergeBridge->listSessions();
-        for(SessionSeq::iterator s = sourceSessions.begin(); s !=sourceSessions.end(); ++s)
-        {
-            if (sessionToReplace->ice_getIdentity() != (*s)->ice_getIdentity())
-            {
-                preserveSessions.push_back(*s);
-            }
-        }
-    }
-    catch(const Ice::Exception &e)
-    {
-        lg(Error) << "connectBridgedSessions(): Unable to get list of sesssions for bridge in connectBridgedSessions(). " ;
-        throw e; // rethrow
-    }
+    BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
 
+    SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
 
     // Create a listener for the sessions not being replaced to handle early termination.
     // The wrapper we're using will remove the listener and free it when
@@ -665,63 +726,9 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
     SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
 
     // Get the bridge for the sessions being moved.
+    BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
 
-    BridgePrx oldBridge(0);
-    count = 0;
-    done = false;
-    while(!done && count < numberOfRetries)
-    {
-        try
-        {
-            oldBridge = bridgedSession->getBridge();
-            done = true;
-        }
-        catch(const Ice::ConnectionLostException&)
-        {
-            IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
-            ++count;
-            if(count >= numberOfRetries)
-            {
-                lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge for bridgedSession, failed "  << numberOfRetries << " retries." << std::endl;
-                throw;
-            }
-        }
-        catch(const NotBridged& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): bridgedSession() is not bridged."  << std::endl;
-            throw e; // rethrow
-        }
-        catch(const Ice::Exception& e)
-        {
-            lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge for bridgedSession:"  << e.what() << std::endl;
-            throw e; // rethrow
-        }
-    }
-
-    SessionSeq migratingSessions;
-    if (oldBridge != 0)
-    {
-        try
-        {
-            // Remove the sessions being moved from their old bridge.
-            SessionSeq allSessions = oldBridge->listSessions();
-            for(SessionSeq::iterator s = allSessions.begin(); s != allSessions.end(); ++s)
-            {
-                if ((*s)->ice_getIdentity() != bridgedSession->ice_getIdentity())
-                {
-                    migratingSessions.push_back(*s);
-                }
-            }
-
-            lg(Debug) << "connectBridgedSessions(): Removing migrating sessions from bridge." ;
-            oldBridge->removeSessions(migratingSessions);
-        }
-        catch(const Ice::Exception&)
-        {
-            lg(Warning) << "connectBridgedSessions(): Unable to remove sessions in connectBridgedSessions(). " ;
-            // We won't give up because of this.
-        }
-    }
+    SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
 
     // Check for early termination by the source.
     if (listener->isTerminated())

-----------------------------------------------------------------------


-- 
asterisk-scf/release/routing.git



More information about the asterisk-scf-commits mailing list