[asterisk-scf-commits] asterisk-scf/release/routing.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Nov 29 17:02:45 CST 2010
branch "master" has been updated
via e971cf16fcd669f6fbec5b48ff2c36aca9e1ad6a (commit)
via 34ce99c2d1e10d40be12c6066a30a6b0b6ad504e (commit)
from 9aad91812cb36d4f1f4fc3cc3880b6d1f1756706 (commit)
Summary of changes:
src/SessionRouter.cpp | 511 ++++++++++++++++++++++++++----------------------
1 files changed, 277 insertions(+), 234 deletions(-)
- Log -----------------------------------------------------------------
commit e971cf16fcd669f6fbec5b48ff2c36aca9e1ad6a
Author: Ken Hunt <ken.hunt at digium.com>
Date: Mon Nov 29 16:11:48 2010 -0600
Addressed review comments from review: CR-ASTSCF-6
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 1afe420..ce6a266 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -57,9 +57,10 @@ public:
*/
RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
mMaxRetries(maxRetries),
- mRetryInterval(intervalInMilliseconds),
+ mRetryIntervalMilliseconds(intervalInMilliseconds),
mCounter(0)
{
+ assert(maxRetries < 0xffff);
}
/**
@@ -67,19 +68,26 @@ public:
*/
bool canRetry()
{
- return mCounter < mMaxRetries;
+ return mCounter <= mMaxRetries;
}
/**
- * Using must call this for each attempt. Applies the delay between calls and does
+ * User must call this after each failed attempt. Applies the delay between calls and does
* bookkeeping.
*/
bool retry()
{
- lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
++mCounter;
- return canRetry();
+ lg(Debug) << "Retrying for the " << mCounter << " time.";
+
+ bool doRetry = canRetry();
+
+ if (doRetry)
+ {
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryIntervalMilliseconds));
+ }
+
+ return doRetry;
}
/**
@@ -92,7 +100,7 @@ public:
private:
size_t mMaxRetries;
- size_t mRetryInterval;
+ size_t mRetryIntervalMilliseconds;
size_t mCounter;
};
@@ -156,7 +164,7 @@ public: // The following operations are implementations of the SessionListener i
}
catch(const Ice::Exception &e)
{
- lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what() << std::endl;
+ lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
}
}
@@ -198,7 +206,7 @@ public:
}
catch(const Ice::Exception &e)
{
- lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what() << std::endl;
+ lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
}
}
}
@@ -225,18 +233,18 @@ public:
sessionsToCall = mSessions;
}
- lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from." << std::endl ;
+ lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
{
try
{
- lg(Debug) << "Removing listener from session " << (*s)->ice_toString() << std::endl ;
+ lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
(*s)->removeListener(mListenerPrx);
}
catch(const Ice::Exception &e)
{
- lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what() << std::endl;
+ lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
}
}
@@ -279,10 +287,24 @@ public:
mAdapter(adapter)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
- SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
- mSessionListener->setProxy(listenerProxy);
- mSessionListener->addSessionAndListen(session);
+ try
+ {
+ SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+ mSessionListener->setProxy(listenerProxy);
+
+ mSessionListener->addSessionAndListen(session);
+ }
+ catch(...)
+ {
+ try
+ {
+ mAdapter->remove(prx->ice_getIdentity());
+ }
+ catch(...)
+ {
+ }
+ }
}
SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
@@ -290,12 +312,26 @@ public:
mAdapter(adapter)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
- SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
- mSessionListener->setProxy(listenerProxy);
- for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+ try
{
- mSessionListener->addSessionAndListen(*s);
+ SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+ mSessionListener->setProxy(listenerProxy);
+
+ for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+ {
+ mSessionListener->addSessionAndListen(*s);
+ }
+ }
+ catch(...)
+ {
+ try
+ {
+ mAdapter->remove(prx->ice_getIdentity());
+ }
+ catch(...)
+ {
+ }
}
}
@@ -306,13 +342,13 @@ public:
// in proper order.
try
{
- lg(Debug) << "About to unregister the listener..." << std::endl ;
+ lg(Debug) << "About to unregister the listener..." ;
mSessionListener->unregister();
}
catch(const std::exception& e)
{
- lg(Debug) << "Exception unregistering: " << e.what() << std::endl ;
+ lg(Debug) << "Exception unregistering: " << e.what() ;
}
try
@@ -324,7 +360,7 @@ public:
}
catch(const std::exception& e)
{
- lg(Debug) << "Exception removing listener from Object Adatper " << e.what() << std::endl ;
+ lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
}
}
@@ -408,7 +444,7 @@ public:
}
catch (const Ice::Exception &e)
{
- lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what() << std::endl;
+ lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
// TBD... probably other bridge cleanup needs to be done.
throw;
}
@@ -434,18 +470,18 @@ public:
{
if(!policy.retry())
{
- lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed " << policy.maxRetries() << " retries." << std::endl;
+ lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed " << policy.maxRetries() << " retries." ;
throw;
}
}
catch(const NotBridged& e)
{
- lg(Error) << "getBridge(): session is not bridged." << std::endl;
+ lg(Error) << "getBridge(): session is not bridged." ;
throw e; // rethrow
}
catch(const Ice::Exception& e)
{
- lg(Error) << "getBridge(): Ice exception getting bridge for session:" << e.what() << std::endl;
+ lg(Error) << "getBridge(): Ice exception getting bridge for session:" << e.what();
throw e; // rethrow
}
}
@@ -454,23 +490,23 @@ public:
}
/**
- * Add a session to each of a given set of endpoints, and return a collection of the
+ * Create a session to each of a given set of endpoints, and return a collection of the
* newly added sessions.
*/
- SessionSeq addSessionToEndpoints(EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+ SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
{
// Add a session
SessionSeq newSessions;
- for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
+ for (EndpointSeq::const_iterator e = endpoints.begin(); e != endpoints.end(); ++e)
{
try
{
SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
// Create a session on the destination.
- lg(Debug) << "addSessionToEndpoints(): Creating a session at destination " << destination;
+ lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
- lg(Debug) << " Session proxy: " << destSession->ice_toString() << std::endl;
+ lg(Debug) << " Session proxy: " << destSession->ice_toString() ;
listener->addSession(destSession);
newSessions.push_back(destSession);
@@ -489,7 +525,7 @@ public:
* @bridge The bridge whose sessions are to be accessed.
* @except An optional session proxy to be excluded from the list of sessions.
*/
- SessionSeq getSessionsInBridge(BridgePrx bridge, SessionPrx except=0)
+ SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
{
SessionSeq sessions;
try
@@ -511,7 +547,7 @@ public:
}
catch(const Ice::Exception &e)
{
- lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what() << std::endl;
+ lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
throw e; // rethrow
}
return sessions;
@@ -577,7 +613,7 @@ void SessionRouter::setBridgeManagerAccessor(const BridgeManagerAccessorPtr& bri
void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const std::string& destination,
const Ice::Current& current)
{
- lg(Debug) << "routeSession() entered with destination " << destination << std::endl;
+ lg(Debug) << "routeSession() entered with destination " << destination ;
// Create a listener for the source to handle early termination.
// The wrapper we're using will remove the listener and free it when
@@ -585,11 +621,11 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
SessionListenerAllocator listener(mImpl->mAdapter, source);
// Route the destination
- lg(Debug) << "routeSession(): Routing destination " << destination << std::endl;
+ lg(Debug) << "routeSession(): Routing destination " << destination;
EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
// Add a session to the endpoints.
- SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
+ SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
if (listener->getNumSessions() < 2)
{
@@ -620,7 +656,7 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
}
catch (const Ice::Exception &e)
{
- lg(Debug) << "routeSession(): Exception creating bridge: " << e.what() << std::endl;
+ lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
listener->unregister();
throw BridgingException(source->getEndpoint()->getId(), destination);
@@ -643,7 +679,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
const ::std::string& destination,
const Ice::Current& current)
{
- lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination << std::endl;
+ lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
BridgePrx bridge(sessionToReplace->getBridge());
@@ -660,7 +696,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
// Add a session
- SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
+ SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
if (listener->getNumSessions() < 2)
{
@@ -712,7 +748,7 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
const SessionPrx& bridgedSession,
const Ice::Current&)
{
- lg(Debug) << "connectBridgedSessions() entered... " << std::endl;
+ lg(Debug) << "connectBridgedSessions() entered... ";
// Get the bridge being merged into.
BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
commit 34ce99c2d1e10d40be12c6066a30a6b0b6ad504e
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Nov 18 18:18:40 2010 -0600
Refactored redundant code into reusable methods. Cleanup from some
late-night fixes leading up to AstriCon.
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 9aec3aa..1afe420 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -43,16 +43,68 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
-class SessionListenerImpl : public SessionListener
+/**
+ * A simple utility to make the retry process a little cleaner until a SmartProxy
+ * or other such mechanism has this functionality built-in.
+ */
+class RetryPolicy
{
public:
- SessionListenerImpl(Ice::ObjectAdapterPtr adapter, const SessionPrx& session) :
- mAdapter(adapter), mTerminated(false), mListenerPrx(0)
+ /**
+ * Constructor:
+ * @param maxRetries Maximum number of times to retry.
+ * @intervalInMilliseconds Will sleep this amount in the retry() method.
+ */
+ RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
+ mMaxRetries(maxRetries),
+ mRetryInterval(intervalInMilliseconds),
+ mCounter(0)
+ {
+ }
+
+ /**
+ * Indicates whether additional retries are warrented.
+ */
+ bool canRetry()
+ {
+ return mCounter < mMaxRetries;
+ }
+
+ /**
+ * Using must call this for each attempt. Applies the delay between calls and does
+ * bookkeeping.
+ */
+ bool retry()
{
+ lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
+ ++mCounter;
+ return canRetry();
}
- SessionListenerImpl(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence) :
- mAdapter(adapter), mTerminated(false), mListenerPrx(0)
+ /**
+ * Accessor for the number of retries allowed.
+ */
+ size_t maxRetries()
+ {
+ return mMaxRetries;
+ }
+
+private:
+ size_t mMaxRetries;
+ size_t mRetryInterval;
+ size_t mCounter;
+};
+
+/**
+ * Listener used to monitor sessions during the routing process. Primarily used to
+ * insure the relevant sessions haven't stopped prior to being bridged.
+ */
+class SessionListenerImpl : public SessionListener
+{
+public:
+ SessionListenerImpl() :
+ mTerminated(false), mListenerPrx(0)
{
}
@@ -60,6 +112,8 @@ public:
{
}
+public: // The following operations are implementations of the SessionListener interface.
+
void connected(const SessionPrx& session, const Ice::Current&)
{
}
@@ -112,10 +166,12 @@ public:
{
}
+public:
+
/**
* Adds a session to be tracked by the listener. This operation doesn't actually call addListener on the Session.
* When we ask an endpoint to create a session, we pass our listener in to the creation process.
- * So the listener has been attached to the session, but we just need to keep track of it in this object.
+ * So the listener has been attached to the session, but we want to keep track of it in this object.
*/
void addSession(SessionPrx session)
{
@@ -204,7 +260,6 @@ public:
private:
boost::shared_mutex mLock;
- Ice::ObjectAdapterPtr mAdapter;
SessionSeq mSessions;
bool mTerminated;
SessionListenerPrx mListenerPrx;
@@ -220,7 +275,7 @@ class SessionListenerAllocator
{
public:
SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
- : mSessionListener(new SessionListenerImpl(adapter, session)),
+ : mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -231,7 +286,7 @@ public:
}
SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
- : mSessionListener(new SessionListenerImpl(adapter, sessionSequence)),
+ : mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -283,6 +338,9 @@ private:
Ice::ObjectAdapterPtr mAdapter;
};
+/**
+ * Private operations and state of the SessionRouter.
+ */
class SessionRouterPriv
{
public:
@@ -298,11 +356,17 @@ public:
{
}
+ /**
+ * Set the accessor to the bridge.
+ */
void setBridgeAccessor(BridgeManagerAccessorPtr bridgeAccessor)
{
mBridgeManagerAccessor = bridgeAccessor;
}
+ /**
+ * Do a lookup of the requested endpoint.
+ */
EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
{
EndpointSeq endpoints;
@@ -331,6 +395,9 @@ public:
}
+ /**
+ * Forward the start() operation to all sessions in a given sequence.
+ */
void forwardStart(SessionSeq& sessions)
{
for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s)
@@ -348,6 +415,138 @@ public:
}
}
+ /**
+ * Provide access to the bridge for a given session.
+ */
+ BridgePrx getBridge(SessionPrx session)
+ {
+ BridgePrx result(0);
+
+ RetryPolicy policy(5, 500);
+ while(policy.canRetry())
+ {
+ try
+ {
+ result = session->getBridge();
+ break;
+ }
+ catch(const Ice::ConnectionLostException&)
+ {
+ if(!policy.retry())
+ {
+ lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed " << policy.maxRetries() << " retries." << std::endl;
+ throw;
+ }
+ }
+ catch(const NotBridged& e)
+ {
+ lg(Error) << "getBridge(): session is not bridged." << std::endl;
+ throw e; // rethrow
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Error) << "getBridge(): Ice exception getting bridge for session:" << e.what() << std::endl;
+ throw e; // rethrow
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Add a session to each of a given set of endpoints, and return a collection of the
+ * newly added sessions.
+ */
+ SessionSeq addSessionToEndpoints(EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+ {
+ // Add a session
+ SessionSeq newSessions;
+ for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
+ {
+ try
+ {
+ SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
+
+ // Create a session on the destination.
+ lg(Debug) << "addSessionToEndpoints(): Creating a session at destination " << destination;
+ SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+ lg(Debug) << " Session proxy: " << destSession->ice_toString() << std::endl;
+
+ listener->addSession(destSession);
+ newSessions.push_back(destSession);
+ }
+ catch(const Ice::Exception &exception)
+ {
+ lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
+ // We may be able to reach SOME of the endpoints.
+ }
+ }
+ return newSessions;
+ }
+
+ /**
+ * Accessor for the sessions in a bridge.
+ * @bridge The bridge whose sessions are to be accessed.
+ * @except An optional session proxy to be excluded from the list of sessions.
+ */
+ SessionSeq getSessionsInBridge(BridgePrx bridge, SessionPrx except=0)
+ {
+ SessionSeq sessions;
+ try
+ {
+ SessionSeq allSessions = bridge->listSessions();
+
+ if (except == 0)
+ {
+ return allSessions;
+ }
+
+ for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
+ {
+ if (except->ice_getIdentity() != (*s)->ice_getIdentity())
+ {
+ sessions.push_back(*s);
+ }
+ }
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what() << std::endl;
+ throw e; // rethrow
+ }
+ return sessions;
+ }
+
+ /**
+ * Removes sessions from a bridge.
+ * @param bridge The bridge whose sessions are to be removed.
+ * @param except The only session to be left in the bridge.
+ */
+ SessionSeq removeSessionsFromBridge(BridgePrx bridge, SessionPrx except)
+ {
+ SessionSeq removedSessions;
+ try
+ {
+ SessionSeq allSessions = bridge->listSessions();
+ for(SessionSeq::iterator s = allSessions.begin(); s != allSessions.end(); ++s)
+ {
+ if ((*s)->ice_getIdentity() != except->ice_getIdentity())
+ {
+ removedSessions.push_back(*s);
+ }
+ }
+
+ lg(Debug) << "Removing sessions from bridge." ;
+ bridge->removeSessions(removedSessions);
+ }
+ catch(const Ice::Exception&)
+ {
+ lg(Warning) << "Unable to remove sessions. " ;
+ // We won't give up because of this.
+ }
+ return removedSessions;
+ }
+
public:
Ice::ObjectAdapterPtr mAdapter;
EndpointRegistryPtr mEndpointRegistry;
@@ -389,28 +588,8 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
lg(Debug) << "routeSession(): Routing destination " << destination << std::endl;
EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
- // Add a session
- SessionSeq newSessions;
- for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
- {
- try
- {
- SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
-
- // Create a session on the destination.
- lg(Debug) << "routeSession(): Creating a session at destination " << destination;
- SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
- lg(Debug) << " Session proxy: " << destSession->ice_toString() << std::endl;
-
- listener->addSession(destSession);
- newSessions.push_back(destSession);
- }
- catch(const Ice::Exception &exception)
- {
- lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
- // We may be able to reach SOME of the endpoints.
- }
- }
+ // Add a session to the endpoints.
+ SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
if (listener->getNumSessions() < 2)
{
@@ -421,6 +600,7 @@ void SessionRouter::routeSession(const AsteriskSCF::SessionCommunications::V1::S
{
throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
}
+
// We're through listening, and we will probably interfere with the Bridge's functionality if
// we keep listening.
listener->unregister();
@@ -465,60 +645,9 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
{
lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination << std::endl;
- BridgePrx bridge(0);
-
- size_t count = 0;
- bool done = false;
- size_t sleepInterval = 500; // XXX Make configurable.
- size_t numberOfRetries = 5;
- while(!done && count < numberOfRetries)
- {
- try
- {
- bridge = sessionToReplace->getBridge();
- done = true;
- }
- catch(const Ice::ConnectionLostException&)
- {
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
- ++count;
- if(count >= numberOfRetries)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge, failed " << numberOfRetries << " retries." << std::endl;
- throw;
- }
- }
- catch(const NotBridged& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): sessionToReplace() is not bridged." << std::endl;
- throw e; // rethrow
- }
- catch(const Ice::Exception& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge:" << e.what() << std::endl;
- throw e; // rethrow
- }
- }
-
- SessionSeq seq;
- try
- {
- seq = bridge->listSessions();
- }
- catch(const Ice::Exception &e)
- {
- lg(Error) << "Unable to get list of sesssions for bridge in connectBridgedSessionsWithDestination(). " ;
- throw e; // rethrow
- }
+ BridgePrx bridge(sessionToReplace->getBridge());
- SessionSeq remainingSessions;
- for(SessionSeq::iterator s = seq.begin(); s !=seq.end(); ++s)
- {
- if (sessionToReplace->ice_getIdentity() != (*s)->ice_getIdentity()) // Don't listen to the session being replaced.
- {
- remainingSessions.push_back(*s);
- }
- }
+ SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
// Create a listener for the sessions not being replaced to handle early termination.
// The wrapper we're using will remove the listener and free it when
@@ -530,27 +659,8 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
- // Add a session
- SessionSeq newSessions;
- for (EndpointSeq::iterator e = endpoints.begin(); e != endpoints.end(); ++e)
- {
- try
- {
- SessionEndpointPrx sessionEndpoint = SessionEndpointPrx::checkedCast(*e);
-
- // Create a session on the destination.
- SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
- listener->addSession(destSession);
- newSessions.push_back(destSession);
-
- lg(Debug) << "connectBridgedSessionsWithDestination(): Created session for routed destination " << destination;
- }
- catch(const Ice::Exception &exception)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create sessionEndpoint for " << destination << ". " << exception.what();
- // We may be able to reach SOME of the endpoints.
- }
- }
+ // Add a session
+ SessionSeq newSessions = mImpl->addSessionToEndpoints(endpoints, destination, listener);
if (listener->getNumSessions() < 2)
{
@@ -584,6 +694,7 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
} // SessionRouter::connectBridgedSessionsWithDestination(...)
+
/**
* Replace one session in a Bridge with sessions from another bridge.
* No routing is actually performed. This operation exists here for consistency,
@@ -604,59 +715,9 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
lg(Debug) << "connectBridgedSessions() entered... " << std::endl;
// Get the bridge being merged into.
- BridgePrx mergeBridge(0);
-
- size_t count = 0;
- bool done = false;
- size_t sleepInterval = 500; // XXX Make configurable.
- size_t numberOfRetries = 5;
- while(!done && count < numberOfRetries)
- {
- try
- {
- mergeBridge = sessionToReplace->getBridge();
- done = true;
- }
- catch(const Ice::ConnectionLostException&)
- {
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
- ++count;
- if(count >= numberOfRetries)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge for sessionToReplace, failed " << numberOfRetries << " retries." << std::endl;
- throw;
- }
- }
- catch(const NotBridged& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): sessionToReplace() is not bridged." << std::endl;
- throw e; // rethrow
- }
- catch(const Ice::Exception& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge for sessionToReplace:" << e.what() << std::endl;
- throw e; // rethrow
- }
- }
-
- SessionSeq preserveSessions;
- try
- {
- SessionSeq sourceSessions = mergeBridge->listSessions();
- for(SessionSeq::iterator s = sourceSessions.begin(); s !=sourceSessions.end(); ++s)
- {
- if (sessionToReplace->ice_getIdentity() != (*s)->ice_getIdentity())
- {
- preserveSessions.push_back(*s);
- }
- }
- }
- catch(const Ice::Exception &e)
- {
- lg(Error) << "connectBridgedSessions(): Unable to get list of sesssions for bridge in connectBridgedSessions(). " ;
- throw e; // rethrow
- }
+ BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
+ SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
// Create a listener for the sessions not being replaced to handle early termination.
// The wrapper we're using will remove the listener and free it when
@@ -665,63 +726,9 @@ void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
// Get the bridge for the sessions being moved.
+ BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
- BridgePrx oldBridge(0);
- count = 0;
- done = false;
- while(!done && count < numberOfRetries)
- {
- try
- {
- oldBridge = bridgedSession->getBridge();
- done = true;
- }
- catch(const Ice::ConnectionLostException&)
- {
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(sleepInterval));
- ++count;
- if(count >= numberOfRetries)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): ConnectionLostException getting bridge for bridgedSession, failed " << numberOfRetries << " retries." << std::endl;
- throw;
- }
- }
- catch(const NotBridged& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): bridgedSession() is not bridged." << std::endl;
- throw e; // rethrow
- }
- catch(const Ice::Exception& e)
- {
- lg(Error) << "connectBridgedSessionsWithDestination(): Ice exception getting bridge for bridgedSession:" << e.what() << std::endl;
- throw e; // rethrow
- }
- }
-
- SessionSeq migratingSessions;
- if (oldBridge != 0)
- {
- try
- {
- // Remove the sessions being moved from their old bridge.
- SessionSeq allSessions = oldBridge->listSessions();
- for(SessionSeq::iterator s = allSessions.begin(); s != allSessions.end(); ++s)
- {
- if ((*s)->ice_getIdentity() != bridgedSession->ice_getIdentity())
- {
- migratingSessions.push_back(*s);
- }
- }
-
- lg(Debug) << "connectBridgedSessions(): Removing migrating sessions from bridge." ;
- oldBridge->removeSessions(migratingSessions);
- }
- catch(const Ice::Exception&)
- {
- lg(Warning) << "connectBridgedSessions(): Unable to remove sessions in connectBridgedSessions(). " ;
- // We won't give up because of this.
- }
- }
+ SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
// Check for early termination by the source.
if (listener->isTerminated())
-----------------------------------------------------------------------
--
asterisk-scf/release/routing.git
More information about the asterisk-scf-commits
mailing list