[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Jan 20 18:27:41 CST 2011


branch "route_async" has been updated
       via  0f1310bcaa67642b0bc73bea931dfab7a673c49c (commit)
      from  749bfb7746b6be3337c6eb750c129b35993c4297 (commit)

Summary of changes:
 src/BasicRoutingServiceApp.cpp                     |    2 +-
 src/CMakeLists.txt                                 |   17 +-
 src/ConnectBridgedSessionsOperation.cpp            |  153 +++
 src/ConnectBridgedSessionsOperation.h              |   68 ++
 ...nectBridgedSessionsWithDestinationOperation.cpp |  188 ++++
 ...onnectBridgedSessionsWithDestinationOperation.h |   79 ++
 src/EndpointRegistry.cpp                           |  114 +--
 src/RouteSessionOperation.cpp                      |  184 ++++
 src/RouteSessionOperation.h                        |   78 ++
 src/RoutingServiceEventPublisher.cpp               |   49 +-
 src/SessionListener.cpp                            |  255 +++++
 src/SessionListener.h                              |  117 ++
 src/SessionRouter.cpp                              | 1147 +-------------------
 src/SessionRouter.h                                |    6 +-
 src/SessionRouterOperation.cpp                     |  270 +++++
 src/SessionRouterOperation.h                       |  325 ++++++
 src/SimpleWorkQueue.cpp                            |  275 -----
 src/SimpleWorkQueue.h                              |   58 -
 src/WorkQueue.h                                    |   85 --
 19 files changed, 1799 insertions(+), 1671 deletions(-)
 create mode 100644 src/ConnectBridgedSessionsOperation.cpp
 create mode 100644 src/ConnectBridgedSessionsOperation.h
 create mode 100644 src/ConnectBridgedSessionsWithDestinationOperation.cpp
 create mode 100644 src/ConnectBridgedSessionsWithDestinationOperation.h
 create mode 100644 src/RouteSessionOperation.cpp
 create mode 100644 src/RouteSessionOperation.h
 create mode 100644 src/SessionListener.cpp
 create mode 100644 src/SessionListener.h
 create mode 100644 src/SessionRouterOperation.cpp
 create mode 100644 src/SessionRouterOperation.h
 delete mode 100644 src/SimpleWorkQueue.cpp
 delete mode 100644 src/SimpleWorkQueue.h
 delete mode 100644 src/WorkQueue.h


- Log -----------------------------------------------------------------
commit 0f1310bcaa67642b0bc73bea931dfab7a673c49c
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Jan 20 18:17:37 2011 -0600

    Incorporation of review feedback.

diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index f246a22..5a5bc78 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,7 +30,7 @@
 #include "EndpointRegistry.h"
 #include "RoutingAdmin.h"
 #include "SessionRouter.h"
-#include "SimpleWorkQueue.h"
+#include "AsteriskSCF/WorkQueue/SimpleWorkQueue.h"
 #include "IceLogger.h"
 #include "logger.h"
 
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1f6833a..00fe756 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,9 +19,17 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
 asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
-asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
-asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
-asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
+
+asterisk_scf_component_add_file(BasicRoutingService SessionRouterOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService SessionRouterOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService RouteSessionOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService RouteSessionOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsWithDestinationOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsWithDestinationOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
+asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
 
 asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
 asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
@@ -34,8 +42,11 @@ include_directories(${logger_dir}/client/src)
 
 include_directories(${utils_dir}/SmartProxy/src)
 
+include_directories(${util_cpp_dir}/WorkQueue/include)
+
 asterisk_scf_component_build_icebox(BasicRoutingService)
 target_link_libraries(BasicRoutingService ${LUA_LIBRARIES})
 target_link_libraries(BasicRoutingService logging-client)
+target_link_libraries(BasicRoutingService WorkQueue)
 
 #asterisk_scf_component_install(BasicRoutingService RUNTIME bin "Basic Routing Service" Core)
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
new file mode 100644
index 0000000..86bdd22
--- /dev/null
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -0,0 +1,153 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "ConnectBridgedSessionsOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This class represents an operation to replace one session in a Bridge with sessions 
+ * from another bridge. No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of 
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+
+ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                                                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                                                                const ::Ice::Current& current,
+                                                                const SessionContext& context,
+                                                                OperationsManager* const listener)
+        : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>(cb, 
+                                            context, 
+                                            listener, 
+                                            boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+        mInitiatorCallback(cb),
+        mSessionToReplace(sessionToReplace),
+        mBridgedSession(bridgedSession),
+        mIceCurrent(current)
+{
+}
+
+ConnectBridgedSessionsOperation::~ConnectBridgedSessionsOperation() 
+{
+        lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+}
+
+/**
+ * This is a state handler for one of this operation's state machine. 
+ * Replace one session in a Bridge with sessions from another bridge.
+ */
+void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
+{
+    lg(Debug) << "connectBridgedSessions() entered... ";
+
+    // Get the bridge being merged into.
+	BridgePrx mergeBridge;
+	try
+	{
+        mergeBridge = getBridge(mSessionToReplace);
+	}
+	catch (const Ice::Exception &e)
+	{
+		finishWithException(e);
+		return;
+	}
+
+	SessionSeq preserveSessions;
+	try
+	{
+        preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
+	}
+	catch (const Ice::Exception &e)
+	{
+		finishWithException(e);
+		return;
+	}
+
+    // Create a listener for the sessions not being replaced to handle early termination.
+    lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+    mListenerManager = listener;
+
+    // Get the bridge for the sessions being moved.
+	BridgePrx oldBridge;
+	try
+	{
+        oldBridge = getBridge(mBridgedSession);
+	}
+	catch (const Ice::Exception &e)
+	{
+		finishWithException(e);
+		return;
+	}
+
+    SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
+
+    // Check for early termination by the source.
+    if (mListenerManager->getListener()->isTerminated())
+    {
+        lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+        finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+        return;
+    }
+
+    // We're through listening, and we will probably interfere with the Bridge's functionality if
+    // we keep listening.
+    lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
+    mListenerManager->getListener()->unregister();
+
+    // Now replace the sessions.
+    try
+    {
+        lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+        mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+    }
+    catch(const Ice::Exception& e)
+    {
+        lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+        finishWithException(e);
+        return;
+    }
+
+    // This operation is complete. Send AMD responses. 
+    finishAndSendResult();
+}
+
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
new file mode 100644
index 0000000..17908b3
--- /dev/null
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -0,0 +1,68 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This class represents an operation to replace one session in a Bridge with sessions 
+ * from another bridge. No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of 
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+class  ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>
+{
+public:
+    ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                          const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                          const Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener);
+
+    virtual ~ConnectBridgedSessionsOperation();
+
+private:
+    /**
+     * This is a state handler for one of this operation's states. 
+     * Replace one session in a Bridge with sessions from another bridge.
+     */
+    void connectBridgedSessionsState();
+
+private:
+    // Operation input params. 
+     AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+     AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
+     AsteriskSCF::SessionCommunications::V1::SessionPrx mBridgedSession;
+    ::Ice::Current mIceCurrent;
+
+}; // class ConnectBridgedSessionsOperation
+
+	
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
new file mode 100644
index 0000000..f2ea73e
--- /dev/null
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -0,0 +1,188 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the 
+ * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type 
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener)
+        : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb, 
+                                context, 
+                                listener, 
+                                boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+        mInitiatorCallback(cb),
+        mSessionToReplace(sessionToReplace),
+        mDestination(destination),
+        mIceCurrent(current)
+{
+}
+
+ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDestinationOperation() 
+{
+        lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
+}
+
+/**
+ * This is a state handler for one of this operation's states. 
+ */
+void ConnectBridgedSessionsWithDestinationOperation::lookupState()
+{
+    lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+	try
+	{
+        mBridge = mSessionToReplace->getBridge();
+	}
+	catch (const Ice::Exception &e)
+	{
+		finishWithException(e);
+		return;
+	}
+
+	try
+	{
+        mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+	}
+	catch (const Ice::Exception &e)
+	{
+		finishWithException(e);
+		return;
+	}
+
+    // Create a listener for the sessions not being replaced to handle early termination.
+    // The wrapper we're using will remove the listener and free it when
+    // this method is left.
+    lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+    mListenerManager = listener;
+
+    // Route the destination
+    lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
+
+    // Set the state to exectute after lookup. 
+    setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+
+    // Lookup the destination. This will use AMI, and the callback should 
+    // schedule us to execute again. 
+    lg(Debug) << "routeSession(): Routing destination " << mDestination;
+    lookupEndpoints(mDestination, mIceCurrent);
+}
+
+/**
+ * This is a state handler for one of this operation's states. 
+ * Entering this state, the destination endpoint has been obtained. 
+ * This operation is invoked from the worker thread, having been scheduled by
+ * the callback from lookup. 
+ */
+void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
+{
+    if (mFinished)
+    {
+        return;
+    }
+
+    assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+    if (mLookupResult.size() < 1)
+    {
+        finishWithException(DestinationNotFoundException(mDestination));
+        return;
+    }
+
+    // Add a session 
+    SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mListenerManager);
+
+    if (mListenerManager->getListener()->getNumSessions() < 2)
+    {
+        lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+        finishWithException(SessionCreationException(mDestination));
+        return;
+    }
+
+    if (mListenerManager->getListener()->isTerminated())
+    {
+        lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+        finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+        return;
+    }
+
+    // We're through listening, and we will probably interfere with the Bridge's functionality if
+    // we keep listening.
+    mListenerManager->getListener()->unregister();
+
+    // Modify the bridge
+    try
+    {
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+        mBridge->replaceSession(mSessionToReplace, newSessions);
+    }
+    catch (const Ice::Exception &e)
+    {
+        lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+        finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+        return;
+    }
+
+    lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+
+	try
+	{
+        forwardStart(newSessions);
+	}
+	catch (const Ice::Exception &e)
+	{
+        lg(Debug) << "routeSession(): Exception forwarding start: " << e.what();
+
+        finishWithException(e);
+        return;
+	}
+
+    // This operation is complete. Send AMD responses. 
+    finishAndSendResult();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
new file mode 100644
index 0000000..95655c5
--- /dev/null
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -0,0 +1,79 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the 
+ * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type 
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+ class  ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+    ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                          const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener);
+
+    virtual ~ConnectBridgedSessionsWithDestinationOperation();
+
+private:
+    /**
+     * This is a state handler for one of this operation's states. 
+     */
+    void lookupState();
+
+    /**
+     * This is a state handler for one of this operation's states. 
+     * Entering this state, the destination endpoint has been obtained. 
+     * This operation is invoked from the worker thread, having been scheduled by
+     * the callback from lookup. 
+     */
+    void establishBridgeState();
+
+private:
+    // Operation input params. 
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+    AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
+    std::string mDestination;
+    ::Ice::Current mIceCurrent;
+
+    // Implementation state
+    AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
+    AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
+
+}; // class ConnectBridgedSessionsWithDestinationOperation
+
+
+	
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6340009..542a1b0 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -66,6 +66,7 @@ public:
 };
 
 typedef map<std::string, RegisteredLocator>::iterator EndpointLocatorMapIterator;
+typedef map<std::string, RegisteredLocator>::const_iterator EndpointLocatorMapConstIterator;
 typedef map<std::string, RegisteredLocator> EndpointLocatorMap;
 
 /**
@@ -153,8 +154,8 @@ public:
     /**
      * Collect results of AMI lookups from multiple EndpointLocators. 
      */
-    void collectResult(const EndpointSeq& endpoints)
-    {
+    void lookupResult(const EndpointSeq& endpoints)
+	{
         boost::lock_guard<boost::mutex> guard(mLock);
 
         if ((endpoints.size() > 0) && mCallback)
@@ -211,45 +212,6 @@ private:
 };
 typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
 
-/**
- * Callback with the results for EndpointLocator::lookup.  This
- * implementation simply forwards the info on to a LookupCollector.
- *
- * @see EndpointLocator::lookup
- * @see LookupCollector
- */
-class LookupCallback : public IceUtil::Shared
-{
-public:
-    LookupCallback(const LookupResultCollectorPtr& collector) :
-                   mCollector(collector)
-    {
-    }
-
-    ~LookupCallback()
-    {
-        lg(Debug) << "LookupCallback being destroyed. ";
-    }
-
-    void lookupResult(const EndpointSeq& endpoints)
-    {
-        // delegation to thread safe object
-        // no lock needed
-        mCollector->collectResult(endpoints);
-        mCollector = 0;
-    }
-
-    void fail(const Ice::Exception &e)
-    {
-        mCollector->fail(e);
-        mCollector = 0;
-    }
-
-private:
-    LookupResultCollectorPtr mCollector;
-};
-typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
-
 
 /**
  * Constructor.
@@ -312,14 +274,11 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
     // Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup. 
     for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
     {
-        // Create our typesafe callback 
-        LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
-
-        // Wrap our callback for AMI
+        // Wrap our results-collecting callback for AMI
         Callback_EndpointLocator_lookupPtr lookupCallback = 
-                 newCallback_EndpointLocator_lookup(callback, 
-                                                     &LookupCallback::lookupResult,
-                                                     &LookupCallback::fail);
+                 newCallback_EndpointLocator_lookup(lookupResultCollector, 
+                                                     &LookupResultCollector::lookupResult,
+                                                     &LookupResultCollector::fail);
         // Start AMI invocation
         lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
         (*locator)->begin_lookup(destination, lookupCallback);
@@ -328,6 +287,27 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
 }
 
 /**
+ * Non-member utiltity to do thread-safe check for existing map entry. 
+ */
+bool locatorExists(const EndpointLocatorMap& map, const std::string& locatorId, boost::shared_mutex& mtx, EndpointLocatorMapConstIterator& existing)
+{
+        EndpointLocatorMapConstIterator end;
+
+        {   // critical scope
+            boost::shared_lock<boost::shared_mutex> lock(mtx);
+
+            existing = map.find(locatorId);
+            end = map.end();
+        }
+
+		if (existing == end)
+		{
+			return false;
+		}
+		return true;
+}
+
+/**
  * Register an EndpointLocator that can provide endpoints.
  *   @param id A unique identifier for the added EndpointLocator.
  *   @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
@@ -340,17 +320,10 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
     {
         lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString() << std::endl;
 
-        EndpointLocatorMapIterator existing(0);
-        EndpointLocatorMapIterator end(0);
+        EndpointLocatorMapIterator existing;
+		bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
 
-        {   // critical scope
-            boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
-
-            existing = mImpl->mEndpointLocatorMap.find(locatorId);
-            end = mImpl->mEndpointLocatorMap.end();
-        }
-
-        if (existing != end)
+        if (exists)
         {
             mImpl->eraseLocatorMapItem(existing);
             lg(Info) << "Received request to add endpoint with id " << locatorId << " which already exists. Replacing with new proxy.";
@@ -380,17 +353,10 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
     {
         lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
 
-        EndpointLocatorMapIterator existing(0);
-        EndpointLocatorMapIterator end(0);
-
-        {   // critical scope
-            boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+        EndpointLocatorMapIterator existing;
+		bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
 
-            existing = mImpl->mEndpointLocatorMap.find(locatorId);
-            end = mImpl->mEndpointLocatorMap.end();
-        }
-
-        if (existing == end)
+        if (exists)
         {
             lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
             mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
@@ -421,16 +387,10 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
 {
     try
     {
-        EndpointLocatorMapIterator existing(0);
-        EndpointLocatorMapIterator end(0);
-
-        {   // critical scope
-            boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
-            EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
-            end = mImpl->mEndpointLocatorMap.end();
-        }
+        EndpointLocatorMapIterator existing;
+		bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
 
-        if (existing == end)
+        if (!exists)
         {
             mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
             throw DestinationNotFoundException(locatorId);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
new file mode 100644
index 0000000..b3c8faf
--- /dev/null
+++ b/src/RouteSessionOperation.cpp
@@ -0,0 +1,184 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "RouteSessionOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread. 
+ */
+RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+                                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                                const ::std::string& destination, 
+                                                const ::Ice::Current& current,
+                                                const SessionContext& context,
+                                                OperationsManager* const listener) 
+                : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>(cb, 
+                                                                            context, 
+                                                                            listener, 
+                                                                            boost::bind(&RouteSessionOperation::lookupState, this)),
+                mInitiatorCallback(cb),
+                mSource(source),
+                mDestination(destination),
+                mIceCurrent(current)
+{
+}
+
+RouteSessionOperation::~RouteSessionOperation() 
+{
+        lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
+}
+
+/**
+ * This is a state handler for one of this operation's states. 
+ * This method is called via mCurrentStateHandler when doWork() is executed from the 
+ * WorkQueue. 
+ */
+void RouteSessionOperation::lookupState()
+{
+    lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+    if (!mSessionContext.bridgeManager.initializeOnce())
+    {
+        lg(Error) << "No proxy to BridgeManager.  "
+            "Make sure all services are running.";
+
+        finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+        return;
+    }
+
+    // Create a listener for the source to handle early termination.
+    // The wrapper we're using will remove the listener and free it when
+    // this method is left.
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+    mListenerManager = listener;
+
+    // Set the state handler to exectute once we've looked up our endpoints. 
+    setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+    // Lookup the destination. This will use AMI, and the callback will 
+    // schedule us to execute again. 
+    lg(Debug) << "routeSession(): Routing destination " << mDestination;
+    lookupEndpoints(mDestination, mIceCurrent);
+}
+
+/**
+ * This is a state handler for one of this operation's states. 
+ * Entering this state, the destination endpoint has been obtained. This state
+ * completes the operation by creating the bridge. 
+ * 
+ * This method is called via mCurrentStateHandler when doWork() is executed from the 
+ * WorkQueue. 
+ */
+void RouteSessionOperation::establishBridgeState()
+{
+    if (mFinished)
+    {
+        return;
+    }
+
+    assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+    if (mLookupResult.size() < 1)
+    {
+        finishWithException(DestinationNotFoundException(mDestination));
+        return;
+    }
+
+    // Add a session to the endpoints.
+    SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mListenerManager);
+
+    if (mListenerManager->getListener()->getNumSessions() < 2)
+    {
+        finishWithException(SessionCreationException(mDestination));
+        return;
+    }
+
+    if (mListenerManager->getListener()->isTerminated())
+    {
+        finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+        return;
+    }
+
+    // We're through listening, and we will probably interfere with the
+    // Bridge's functionality if we keep listening.
+    mListenerManager->getListener()->unregister();
+
+    // Create the bridge
+    BridgePrx bridge;
+    try
+    {
+        SessionSeq bridgedSessions;
+        bridgedSessions.push_back(mSource);
+
+        bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+        bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+        lg(Debug) << "routeSession(): Creating bridge.";
+        bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+    }
+    catch (const Ice::Exception &e)
+    {
+        lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+        finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+        return;
+    }
+
+    // Forward the start to all the destinations routed to.
+    lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+
+	try
+	{
+        forwardStart(newSessions);
+	}
+	catch (const Ice::Exception &e)
+	{
+        lg(Debug) << "routeSession(): Exception forwarding start: " << e.what();
+
+        finishWithException(e);
+        return;
+	}
+
+    // This operation is complete. Send AMD responses. 
+    finishAndSendResult();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
new file mode 100644
index 0000000..2100853
--- /dev/null
+++ b/src/RouteSessionOperation.h
@@ -0,0 +1,78 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+	/**
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread. 
+ */
+class  RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>
+{
+public:
+    RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener);
+
+    virtual ~RouteSessionOperation();
+
+private:
+    /**
+     * This is a state handler for one of this operation's states. 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void lookupState();
+
+    /**
+     * This is a state handler for one of this operation's states. 
+     * Entering this state, the destination endpoint has been obtained. This state
+     * completes the operation by creating the bridge. 
+     * 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void establishBridgeState();
+
+private:
+    // Operation input params. 
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+    AsteriskSCF::SessionCommunications::V1::SessionPrx mSource;
+    std::string mDestination;
+    ::Ice::Current mIceCurrent;
+    
+}; // class RouteSessionOperation
+
+	
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 9bceb87..99a132a 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -99,7 +99,7 @@ public:
         }
         catch (const Ice::NoEndpointException&)
         {
-            assert(0); // All operations callable via icestorm must support oneway. 
+            assert(false); // All operations callable via icestorm must support oneway. 
         }
 
         mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
@@ -112,6 +112,8 @@ public:
      */
     bool isInitialized()
     {
+		boost::lock_guard<boost::mutex> lock(mLock); 
+
         if (mInitialized)
         {
             return true;
@@ -177,12 +179,9 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
     const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    { // scope for the lock
-        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-        if (!mImpl->isInitialized())
-        {
-            return;
-        }
+    if (!mImpl->isInitialized())
+    {
+        return;
     }
 
     try
@@ -201,12 +200,9 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
-    { // scope for the lock
-        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-        if (!mImpl->isInitialized())
-        {
-            return;
-        }
+    if (!mImpl->isInitialized())
+    {
+        return;
     }
 
     try
@@ -226,12 +222,9 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
     const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    { // scope for the lock
-        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-        if (!mImpl->isInitialized())
-        {
-            return;
-        }
+    if (!mImpl->isInitialized())
+    {
+        return;
     }
 
     try
@@ -249,12 +242,9 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
  */
 void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
 {
-    { // scope for the lock
-        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-        if (!mImpl->isInitialized())
-        {
-            return;
-        }
+    if (!mImpl->isInitialized())
+    {
+        return;
     }
 
     try
@@ -272,12 +262,9 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
  */
 void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
 {
-    { // scope for the lock
-        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
-        if (!mImpl->isInitialized())
-        {
-            return;
-        }
+    if (!mImpl->isInitialized())
+    {
+        return;
     }
 
     try
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
new file mode 100644
index 0000000..b39af59
--- /dev/null
+++ b/src/SessionListener.cpp
@@ -0,0 +1,255 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011 Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "logger.h"
+
+#include "SessionRouter.h"
+#include "SessionListener.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+SessionListenerImpl::SessionListenerImpl() :
+    mTerminated(false), mListenerPrx(0)
+{
+}
+
+SessionListenerImpl::~SessionListenerImpl()
+{
+}
+
+
+void SessionListenerImpl::stopped(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode, const Ice::Current&)
+{
+    mTerminated = true;
+
+    SessionSeq cacheSessions;
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        cacheSessions = mSessions;
+    }
+
+    // Forward the stop message to all sessions other than the one that originally sent it.
+    for(SessionSeq::iterator s = cacheSessions.begin(); s != cacheSessions.end(); ++s)
+    {
+        try
+        {
+            if (session->ice_getIdentity() != (*s)->ice_getIdentity())
+            {
+                (*s)->stop(responseCode);
+            }
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
+        }
+    }
+
+}
+
+void SessionListenerImpl::addSession(const SessionPrx& session)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    mSessions.push_back(session);
+}
+
+/**
+ * Add a session to be tracked by this listener, and attach this listener to the session.
+ */
+void SessionListenerImpl::addSessionAndListen(SessionPrx session)
+{
+    {   // critical scope
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSessions.push_back(session);
+    }
+
+    if (mListenerPrx != 0)
+    {
+        try
+        {
+            lg(Debug) << "Adding listener to session." ;
+            session->addListener(mListenerPrx);
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
+        }
+    }
+}
+
+const int SessionListenerImpl::getNumSessions()
+{
+    boost::shared_lock<boost::shared_mutex> lock(mLock);
+    return mSessions.size();
+}
+
+bool SessionListenerImpl::isTerminated() // Lots of shoring up to do for asynchronous operations!
+{
+    return mTerminated;
+}
+
+/**
+ * Stop listening to all sessions we're monitoring.
+ */
+void SessionListenerImpl::unregister()
+{
+    SessionSeq sessionsToCall;
+    {   // critical scope
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        sessionsToCall = mSessions;
+    }
+
+    lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
+
+    for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
+    {
+        try
+        {
+            lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
+            (*s)->removeListener(mListenerPrx);
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
+        }
+    }
+
+    // Since we're through listening to them, we should just drop our references to them.
+    {   // critical scope
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSessions.clear();
+    }
+}
+
+SessionListenerPrx SessionListenerImpl::getProxy()
+{
+    return mListenerPrx;
+}
+
+void SessionListenerImpl::setProxy(const SessionListenerPrx& prx)
+{
+    mListenerPrx = prx;
+}
+
+/** 
+ * Constructor for SessionListenerManager.
+ */
+SessionListenerManager::SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+    : mSessionListener(new SessionListenerImpl()),
+      mAdapter(adapter)
+{
+    Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
+
+    try
+    {
+        SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+        mSessionListener->setProxy(listenerProxy);
+
+        mSessionListener->addSessionAndListen(session);
+    }
+    catch(...)
+    {
+        try
+        {
+           mAdapter->remove(prx->ice_getIdentity());
+        }
+        catch(...)
+        {
+        }
+    }
+}
+
+/** 
+ * Constructor for SessionListenerManager.
+ */
+SessionListenerManager::SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionSeq& sessionSequence)
+    : mSessionListener(new SessionListenerImpl()),
+      mAdapter(adapter)
+{
+    Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
+
+    try
+    {
+        SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+        mSessionListener->setProxy(listenerProxy);
+
+        for(SessionSeq::const_iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+        {
+            mSessionListener->addSessionAndListen(*s);
+        }
+    }
+    catch(...)
+    {
+        try
+        {
+           mAdapter->remove(prx->ice_getIdentity());
+        }
+        catch(...)
+        {
+        }
+    }
+}
+
+SessionListenerManager::~SessionListenerManager()
+{
+    // Our private SessionListener implementation adds itself as a servant. It
+    // can't really undo that without getting itself deleted. So undo it
+    // in proper order.
+    try
+    {
+        lg(Debug) << "About to unregister the listener..." ;
+
+        mSessionListener->unregister();
+    }
+    catch(const std::exception& e)
+    {
+        lg(Debug) << "Exception unregistering: " << e.what() ;
+    }
+
+    try
+    {
+        // Only the adapter holds a smart pointer for this servant, so this will
+        // cause it to be deleted.
+        lg(Debug) << "Removing listener from object adapter." ;
+        mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
+    }
+    catch(const std::exception& e)
+    {
+        lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
+    }
+}
+
+SessionListenerImpl* SessionListenerManager::getListener() const
+{
+    return mSessionListener;
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/SessionListener.h b/src/SessionListener.h
new file mode 100644
index 0000000..0466b1b
--- /dev/null
+++ b/src/SessionListener.h
@@ -0,0 +1,117 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/thread/shared_mutex.hpp>
+
+#include <Ice/Ice.h>
+
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * Listener used to monitor sessions during the routing process. Primarily used to 
+ * insure the relevant sessions haven't stopped prior to being bridged. 
+ */
+class SessionListenerImpl : public AsteriskSCF::SessionCommunications::V1::SessionListener
+{
+public:
+    SessionListenerImpl();
+    ~SessionListenerImpl();
+
+public: // Overrides for SessionListener
+
+	void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		           const Ice::Current&) {}     // No-op
+
+	void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		         const Ice::Current&) {}       // No-op
+
+    void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		      const Ice::Current&) {}          // No-op
+
+    void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		             const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, 
+					 const Ice::Current&) {}   // No-op
+
+    void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		         const Ice::Current&) {}       // No op
+
+    void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		        const Ice::Current&) {}        // No op
+
+	void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+		         const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode, 
+				 const Ice::Current&);
+
+public: // Impl operations
+
+    /**
+     * Adds a session to be tracked by the listener. 
+     */
+    void addSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+    /**
+     * Add a session to be tracked by this listener, and attach this listener to the session.
+     */
+    void addSessionAndListen(AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+    const int getNumSessions();
+    bool isTerminated() ;
+
+    /**
+     * Stop listening to all sessions we're monitoring.
+     */
+    void unregister();
+    AsteriskSCF::SessionCommunications::V1::SessionListenerPrx getProxy();
+    void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& prx);
+
+private:
+    boost::shared_mutex mLock;
+
+    AsteriskSCF::SessionCommunications::V1::SessionSeq mSessions;
+    bool mTerminated;
+    AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListenerPrx;
+
+}; // class SessionListenerImpl
+
+typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
+
+/**
+ * This class manages the lifecycle of a session listener.
+ */
+class SessionListenerManager
+{
+public:
+	SessionListenerManager(Ice::ObjectAdapterPtr adapter, const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+	SessionListenerManager(Ice::ObjectAdapterPtr adapter, const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
+
+	~SessionListenerManager();
+    SessionListenerImpl* getListener() const;
+
+private:
+    SessionListenerImpl* mSessionListener;
+    Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index fcf6eef..5bba4a5 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -19,12 +19,18 @@
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 
+#include "AsteriskSCF/WorkQueue/WorkQueue.h"
+
 #include "SessionRouter.h"
+#include "RouteSessionOperation.h"
+#include "ConnectBridgedSessionsOperation.h"
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+
+#include "SessionListener.h"
 #include "EndpointRegistry.h"
 #include "RoutingIf.h"
 #include "EndpointIf.h"
 #include "logger.h"
-#include "WorkQueue.h"
 
 using namespace AsteriskSCF;
 using namespace AsteriskSCF::Core::Routing::V1;
@@ -40,1148 +46,11 @@ namespace
 Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
 }
 
-/**
- * TBD... This code should be refactored for AMD and use AMI on outgoing calls.
- */
 namespace AsteriskSCF
 {
 namespace BasicRoutingService
 {
 
-/**
- * A simple utility to make the retry process a little cleaner until a SmartProxy
- * or other such mechanism has this functionality built-in. 
- */
-class RetryPolicy
-{
-public:
-    /**
-     * Constructor:
-     *  @param maxRetries Maximum number of times to retry. 
-     *  @intervalInMilliseconds Will sleep this amount in the retry() method. 
-     */
-    RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
-        mMaxRetries(maxRetries),
-        mRetryIntervalMilliseconds(intervalInMilliseconds),
-        mCounter(0)
-    {
-        assert(maxRetries < 0xffff);
-    }
-
-    /**
-     * Indicates whether additional retries are warrented. 
-     */
-    bool canRetry()
-    {
-        return mCounter <= mMaxRetries;
-    }
-
-    /**
-     * User must call this after each failed attempt. Applies the delay between calls and does
-     * bookkeeping.
-     */
-    bool retry()
-    {
-        ++mCounter;
-        lg(Debug) << "Retrying for the " << mCounter << " time.";
-
-        bool doRetry = canRetry();
-
-        if (doRetry)
-        {
-            IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryIntervalMilliseconds));
-        }
-
-        return doRetry;
-    }
-
-    /**
-     * Accessor for the number of retries allowed. 
-     */
-    size_t maxRetries()
-    {
-        return mMaxRetries;
-    }
-
-private:
-    size_t mMaxRetries;
-    size_t mRetryIntervalMilliseconds;
-    size_t mCounter;
-
-}; // class RetryPolicy
-
-/**
- * Listener used to monitor sessions during the routing process. Primarily used to 
- * insure the relevant sessions haven't stopped prior to being bridged. 
- */
-class SessionListenerImpl : public SessionListener
-{
-public:
-    SessionListenerImpl() :
-        mTerminated(false), mListenerPrx(0)
-    {
-    }
-
-    ~SessionListenerImpl()
-    {
-    }
-
-public: // The following operations are implementations of the SessionListener interface.
-
-    void connected(const SessionPrx& session, const Ice::Current&)
-    {
-    }
-
-    void flashed(const SessionPrx& session, const Ice::Current&)
-    {
-    }
-
-    void held(const SessionPrx& session, const Ice::Current&)
-    {
-    }
-
-    void progressing(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&)
-    {
-    }
-
-    void ringing(const SessionPrx& session, const Ice::Current&)
-    {
-    }
-
-    void stopped(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode, const Ice::Current&)
-    {
-        mTerminated = true;
-
-        SessionSeq cacheSessions;
-        {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            cacheSessions = mSessions;
-        }
-
-        // Forward the stop message to all sessions other than the one that originally sent it.
-        for(SessionSeq::iterator s = cacheSessions.begin(); s != cacheSessions.end(); ++s)
-        {
-            try
-            {
-                if (session->ice_getIdentity() != (*s)->ice_getIdentity())
-                {
-                    (*s)->stop(responseCode);
-                }
-            }
-            catch(const Ice::Exception &e)
-            {
-                lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
-            }
-        }
-
-    }
-
-    void unheld(const SessionPrx& session, const Ice::Current&)
-    {
-    }
-
-public: 
-
-    /**
-     * Adds a session to be tracked by the listener. This operation doesn't actually call addListener on the Session.
-     * When we ask an endpoint to create a session, we pass our listener in to the creation process.
-     * So the listener has been attached to the session, but we want to keep track of it in this object.
-     */
-    void addSession(SessionPrx session)
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mSessions.push_back(session);
-    }
-
-    /**
-     * Add a session to be tracked by this listener, and attach this listener to the session.
-     */
-    void addSessionAndListen(SessionPrx session)
-    {
-        {   // critical scope
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            mSessions.push_back(session);
-        }
-
-        if (mListenerPrx != 0)
-        {
-            try
-            {
-                lg(Debug) << "Adding listener to session." ;
-                session->addListener(mListenerPrx);
-            }
-            catch(const Ice::Exception &e)
-            {
-                lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
-            }
-        }
-    }
-
-    const int getNumSessions()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return mSessions.size();
-    }
-
-    bool isTerminated() // Lots of shoring up to do for asynchronous operations!
-    {
-        return mTerminated;
-    }
-
-    /**
-     * Stop listening to all sessions we're monitoring.
-     */
-    void unregister()
-    {
-        SessionSeq sessionsToCall;
-        {   // critical scope
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            sessionsToCall = mSessions;
-        }
-
-        lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
-
-        for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
-        {
-            try
-            {
-                lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
-                (*s)->removeListener(mListenerPrx);
-            }
-            catch(const Ice::Exception &e)
-            {
-                lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
-            }
-        }
-
-        // Since we're through listening to them, we should just drop our references to them.
-        {   // critical scope
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            mSessions.clear();
-        }
-    }
-
-    SessionListenerPrx getProxy()
-    {
-        return mListenerPrx;
-    }
-
-    void setProxy(const SessionListenerPrx& prx)
-    {
-        mListenerPrx = prx;
-    }
-
-private:
-    boost::shared_mutex mLock;
-
-    SessionSeq mSessions;
-    bool mTerminated;
-    SessionListenerPrx mListenerPrx;
-
-}; // class SessionListenerImpl
-
-typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
-
-/**
- * This class manages the lifecycle of a session listener.
- */
-class SessionListenerManager
-{
-public:
-    SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
-        : mSessionListener(new SessionListenerImpl()),
-          mAdapter(adapter)
-    {
-        Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-
-        try
-        {
-            SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
-            mSessionListener->setProxy(listenerProxy);
-
-            mSessionListener->addSessionAndListen(session);
-        }
-        catch(...)
-        {
-            try
-            {
-               mAdapter->remove(prx->ice_getIdentity());
-            }
-            catch(...)
-            {
-            }
-        }
-    }
-
-    SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
-        : mSessionListener(new SessionListenerImpl()),
-          mAdapter(adapter)
-    {
-        Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-
-        try
-        {
-            SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
-            mSessionListener->setProxy(listenerProxy);
-
-            for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
-            {
-                mSessionListener->addSessionAndListen(*s);
-            }
-        }
-        catch(...)
-        {
-            try
-            {
-               mAdapter->remove(prx->ice_getIdentity());
-            }
-            catch(...)
-            {
-            }
-        }
-    }
-
-    ~SessionListenerManager()
-    {
-        // Our private SessionListener implementation adds itself as a servant. It
-        // can't really undo that without getting itself deleted. So undo it
-        // in proper order.
-        try
-        {
-            lg(Debug) << "About to unregister the listener..." ;
-
-            mSessionListener->unregister();
-        }
-        catch(const std::exception& e)
-        {
-            lg(Debug) << "Exception unregistering: " << e.what() ;
-        }
-
-        try
-        {
-            // Only the adapter holds a smart pointer for this servant, so this will
-            // cause it to be deleted.
-            lg(Debug) << "Removing listener from object adapter." ;
-            mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
-        }
-        catch(const std::exception& e)
-        {
-            lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
-        }
-    }
-
-    SessionListenerImpl* getListener() const
-    {
-        return mSessionListener;
-    }
-
-private:
-    SessionListenerImpl *mSessionListener;
-    Ice::ObjectAdapterPtr mAdapter;
-
-}; // class SessionListenerManager
-
-typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
-
-/**
- * Context required by all of the SessionRouter operations. 
- *  All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required). 
- */
-struct SessionContext
-{
-public:
-    /**
-     * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
-     * due to component initialization sequence. 
-     */
-    SessionContext(const Ice::ObjectAdapterPtr& adapter,
-                            const EndpointRegistryPtr& registry,
-                            const RoutingEventsPtr& publisher,
-                            const boost::shared_ptr<WorkQueue>& workQueue)
-                                  :  adapter(adapter),
-                                     endpointRegistry(registry),
-                                     eventPublisher(publisher),
-                                     workQueue(workQueue)
-    {
-    }
-
-    Ice::ObjectAdapterPtr adapter;
-    EndpointRegistryPtr endpointRegistry;
-    RoutingEventsPtr eventPublisher;
-    boost::shared_ptr<WorkQueue> workQueue;
-    AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
-};
-
-/** 
- * An interface for an object that manages a collection of SessionRouterOperations. 
- */
-class OperationsManager
-{
-public:
-    /**
-     * Operations indicate they are finished by calling this method.
-     */
-    virtual void finished(WorkQueue::Work *) = 0;
-
-    /**
-     * Operations can reschedule themselves on the WorkQueue. 
-     */
-    virtual void reschedule(WorkQueue::Work *) = 0;
-
-protected:
-    OperationsManager() {}  // Can't construct directly. This class is an interface. 
-};
-
-// Forward-declaration
-template <typename T> class LookupCallback;
-
-/**
- * This is a base class for worker objects that offload SessionRouter operations 
- * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
- * interface so that it can be enqueued to a worker thread or thread pool. 
- * The template parameter T is the type of the AMD callback type for the 
- * particular operation. 
- */
-template<typename T>
-class SessionRouterOperation : public WorkQueue::Work
-{
-public:
-    /**
-     * Constructor. 
-     *  @param amdCallback The callback object to provide results to the initiator of this operation.
-     *  @param context The SessionContext provides references to key objects needed by each operation. 
-     *  @param manager 
-     */
-    SessionRouterOperation(const T& amdCallback,
-                           const SessionContext& context,
-                           OperationsManager* manager,
-                           const boost::function<void ()> &initialStateHandler) 
-       : mInitiatorCallback(amdCallback),
... 1792 lines suppressed ...


-- 
team/ken.hunt/route_async_routing.git



More information about the asterisk-scf-commits mailing list