[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Dec 23 01:29:31 UTC 2010


branch "route_async" has been updated
       via  749bfb7746b6be3337c6eb750c129b35993c4297 (commit)
       via  3f8b609fc14db8635d6d5c917b4b28b89f0e5c0e (commit)
      from  a52053b5f138ba4b26c9474c906d0e59df196379 (commit)

Summary of changes:
 src/SessionRouter.cpp |    1 -
 1 files changed, 0 insertions(+), 1 deletions(-)


- Log -----------------------------------------------------------------
commit 749bfb7746b6be3337c6eb750c129b35993c4297
Merge: 3f8b609 a52053b
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Dec 22 19:17:52 2010 -0600

    Merge branch 'route_async' of gitdepot:team/ken.hunt/route_async_routing into route_async
    Conflicts:
    	src/SessionRouter.cpp

diff --cc src/SessionRouter.cpp
index 2e28b94,b35bc34..fcf6eef
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@@ -963,7 -963,7 +963,6 @@@ private
  
          // Set the state to exectute after lookup. 
          setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
--       // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
  
          // Lookup the destination. This will use AMI, and the callback should 
          // schedule us to execute again. 

commit 3f8b609fc14db8635d6d5c917b4b28b89f0e5c0e
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Dec 22 19:11:23 2010 -0600

    Routing async support.

diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 3920cc2..f246a22 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,6 +30,7 @@
 #include "EndpointRegistry.h"
 #include "RoutingAdmin.h"
 #include "SessionRouter.h"
+#include "SimpleWorkQueue.h"
 #include "IceLogger.h"
 #include "logger.h"
 
@@ -56,11 +57,14 @@ namespace BasicRoutingService
 class BasicRoutingServiceApp : public IceBox::Service
 {
 public:
-    BasicRoutingServiceApp() :
-        mDone(false), mInitialized(false), mRunning(false)
+    BasicRoutingServiceApp() 
+        : mDone(false), 
+          mInitialized(false), 
+          mRunning(false),
+          mWorkQueue( new SimpleWorkQueue("SessionRouterWorkQueue", lg))
     {
-
     }
+
     ~BasicRoutingServiceApp()
     {
         // Smart pointers do your thing.
@@ -88,6 +92,7 @@ private:
     bool mDone;
     bool mInitialized;
     bool mRunning;
+    boost::shared_ptr<SimpleWorkQueue> mWorkQueue;
 
     std::string mAppName;
     ServiceLocatorManagementPrx mServiceLocatorManagement;
@@ -261,7 +266,7 @@ void BasicRoutingServiceApp::initialize()
         mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
 
         // Create publish the SessionRouter interface.
-        SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher));
+        SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
         BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
         mSessionRouter = basicSessionPtr;
         mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04c5eda..1f6833a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,6 +19,9 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
 asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
+asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
 
 asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
 asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6eecc72..6340009 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -22,6 +22,7 @@
 #include "ScriptProcessor.h"
 #include "logger.h"
 
+using namespace ::AsteriskSCF::Core::Endpoint::V1;
 using namespace ::AsteriskSCF::Core::Routing::V1;
 using namespace ::AsteriskSCF::System::Logging;
 using namespace ::AsteriskSCF::Core::Routing::V1::Event;
@@ -117,6 +118,140 @@ public:
 };
 
 /**
+ * A collector for lookup() operation AMI replies. 
+ */
+class LookupResultCollector : public IceUtil::Shared
+{
+public:
+    /**
+     * Constructor.
+     * @param cb Ice callback.
+     * @param numVotes The number of times isSupported will be called.
+     */
+    LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback, 
+                          const std::string& destination,
+                          const RoutingEventsPtr& eventPublisher, 
+                          int numVotes) 
+                : mCallback(callback), 
+                  mNumVotes(numVotes),
+                  mEventPublisher(eventPublisher),
+                  mDestination(destination)
+    {
+        assert(mNumVotes >= 0);
+
+        if (mNumVotes == 0)
+        {
+           notifyFailed();
+        }
+    }
+
+    ~LookupResultCollector()
+    {
+        lg(Debug) << "LookupResultCollector being destroyed. ";
+    }
+
+    /**
+     * Collect results of AMI lookups from multiple EndpointLocators. 
+     */
+    void collectResult(const EndpointSeq& endpoints)
+    {
+        boost::lock_guard<boost::mutex> guard(mLock);
+
+        if ((endpoints.size() > 0) && mCallback)
+        {
+            mCallback->ice_response(endpoints);
+
+            // clear the mCallback pointer so we only answer once
+            mCallback = 0;
+
+            lg(Debug) << "EndpointRegistry::lookup() found Endpoint for destination " << mDestination;
+          
+            // Post event
+            mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
+        }
+
+        assert(mNumVotes > 0); // isSupported was called too many times
+
+        if (--mNumVotes == 0 && mCallback)
+        {
+            notifyFailed();
+        }
+    }
+
+    void fail(const Ice::Exception &e)
+    {
+        boost::lock_guard<boost::mutex> guard(mLock);
+
+        if (--mNumVotes == 0 && mCallback)
+        {
+            notifyFailed();
+        }
+    }
+
+    void notifyFailed()
+    {
+        DestinationNotFoundException e(mDestination);
+        mCallback->ice_exception(e);
+
+        // clear the mCallback pointer so we only answer once
+        mCallback = 0;
+
+        // Post event
+        mEventPublisher->lookupEvent(mDestination, Event::FAILURE);
+
+        lg(Debug) << "EndpointRegistry::lookup() failed to find destination " << mDestination;
+    }
+
+private:
+    boost::mutex mLock;
+    AMD_EndpointLocator_lookupPtr mCallback;
+    int mNumVotes;
+    RoutingEventsPtr mEventPublisher;
+    std::string mDestination;
+};
+typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
+
+/**
+ * Callback with the results for EndpointLocator::lookup.  This
+ * implementation simply forwards the info on to a LookupCollector.
+ *
+ * @see EndpointLocator::lookup
+ * @see LookupCollector
+ */
+class LookupCallback : public IceUtil::Shared
+{
+public:
+    LookupCallback(const LookupResultCollectorPtr& collector) :
+                   mCollector(collector)
+    {
+    }
+
+    ~LookupCallback()
+    {
+        lg(Debug) << "LookupCallback being destroyed. ";
+    }
+
+    void lookupResult(const EndpointSeq& endpoints)
+    {
+        // delegation to thread safe object
+        // no lock needed
+        mCollector->collectResult(endpoints);
+        mCollector = 0;
+    }
+
+    void fail(const Ice::Exception &e)
+    {
+        mCollector->fail(e);
+        mCollector = 0;
+    }
+
+private:
+    LookupResultCollectorPtr mCollector;
+};
+typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
+
+
+/**
  * Constructor.
  */
 EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
@@ -124,6 +259,74 @@ EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const Routi
 {
 }
 
+    
+/**
+ * Returns the endpoints that match the specified destination id.
+ *   @param id String identifier of the the destination.
+ */
+void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& amdcallback, 
+                                    const ::std::string& destination, 
+                                    const ::Ice::Current&)
+{
+    AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+
+    lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
+
+    string modifiedDestination(destination);
+    if (mImpl->mScriptProcessor.get() != 0)
+    {
+        if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
+        {
+            mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
+
+            lg(Error) << "lookup(): denied by confirmLookup() script.";
+            amdcallback->ice_response(endpoints);
+            return;
+        }
+    }
+
+    std::vector<EndpointLocatorPrx> locatorsToTry;
+
+    // Iterate over all registered EndpointLocators and check their regular expressions against the destination.
+    EndpointLocatorMap locatorMap;
+    mImpl->getEndpointLocatorMapCopy(locatorMap);
+    for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
+    {
+        // Test to see if the destination matches any of this entry's regular expressions.
+        for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
+        {
+            if (boost::regex_match(modifiedDestination, *reg))
+            {
+                lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << " at " << entry->first;
+                locatorsToTry.push_back(entry->second.locator);
+            }
+        }
+    }
+
+    // Create a single results collector for the AMI callbacks to reference. 
+    LookupResultCollectorPtr lookupResultCollector(new LookupResultCollector(amdcallback, 
+                                                                             destination,
+                                                                             mImpl->mEventPublisher,
+                                                                             locatorsToTry.size()));
+
+    // Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup. 
+    for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
+    {
+        // Create our typesafe callback 
+        LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
+
+        // Wrap our callback for AMI
+        Callback_EndpointLocator_lookupPtr lookupCallback = 
+                 newCallback_EndpointLocator_lookup(callback, 
+                                                     &LookupCallback::lookupResult,
+                                                     &LookupCallback::fail);
+        // Start AMI invocation
+        lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
+        (*locator)->begin_lookup(destination, lookupCallback);
+    }
+
+}
+
 /**
  * Register an EndpointLocator that can provide endpoints.
  *   @param id A unique identifier for the added EndpointLocator.
@@ -248,63 +451,6 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
 }
 
 /**
- * Returns the endpoints that match the specified destination id.
- *   @param id String identifier of the the destination.
- */
-AsteriskSCF::Core::Endpoint::V1::EndpointSeq EndpointRegistry::lookup(const std::string& destination, const Ice::Current&)
-{
-    AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
-
-    lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
-
-    string modifiedDestination(destination);
-    if (mImpl->mScriptProcessor.get() != 0)
-    {
-        if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
-        {
-            mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
-
-            lg(Error) << "lookup(): denied by confirmLookup() script.";
-            return endpoints;
-        }
-    }
-
-    EndpointLocatorMap locatorMap;
-    mImpl->getEndpointLocatorMapCopy(locatorMap);
-
-    for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
-    {
-        // Test to see if the destination matches any of this entry's regular expressions.
-        for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
-        {
-            if (boost::regex_match(modifiedDestination, *reg))
-            {
-                lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << ". Calling remote lookup()";
-
-                try
-                {
-                    endpoints = entry->second.locator->lookup(modifiedDestination);
-                }
-                catch (const IceUtil::Exception& e)
-                {
-                    lg(Error) << "Exception calling registered EndpointLocator for " << entry->first << " Details: " << e.what();
-                }
-                break;
-            }
-        }
-    }
-
-    Event::OperationResult result(Event::FAILURE);
-    if (endpoints.size() > 0)
-    {
-        result = Event::SUCCESS;
-    }
-    mImpl->mEventPublisher->lookupEvent(destination, result);
-
-    return endpoints;
-}
-
-/**
  * Configure this object with a ScriptProcessor.
  */
 void EndpointRegistry::setScriptProcessor(ScriptProcessor* scriptProcessor)
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index d85a015..775338e 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -73,7 +73,7 @@ public:
      * Returns the endpoints that match the specified destination id.
      *   @param id String identifier of the the destination.
      */
-    AsteriskSCF::Core::Endpoint::V1::EndpointSeq lookup(const std::string& destination, const Ice::Current&);
+    virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
 
 public:
 
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 8aae895..9bceb87 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -16,6 +16,8 @@
 #include <Ice/Ice.h>
 #include <IceStorm/IceStorm.h>
 
+#include <boost/thread/mutex.hpp>
+
 #include "RoutingServiceEventPublisher.h"
 #include "logger.h"
 
@@ -42,6 +44,7 @@ public:
     RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
         mAdapter(adapter), mInitialized(false)
     {
+        boost::lock_guard<boost::mutex> lock(mLock); 
         initialize();
     }
 
@@ -89,7 +92,17 @@ public:
         }
 
         Ice::ObjectPrx publisher = topic->getPublisher();
-        mEventTopic = Event::RoutingEventsPrx::uncheckedCast(publisher);
+        Ice::ObjectPrx oneway;
+        try
+        {
+            oneway = publisher->ice_oneway();
+        }
+        catch (const Ice::NoEndpointException&)
+        {
+            assert(0); // All operations callable via icestorm must support oneway. 
+        }
+
+        mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
 
         mInitialized = true;
     }
@@ -116,7 +129,8 @@ public:
     }
 
 public:
-    Event::RoutingEventsPrx mEventTopic;
+    Event::RoutingEventsPrx mEventTopic; // Using one-way proxy. 
+    boost::mutex mLock;
 
 private:
     Ice::ObjectAdapterPtr mAdapter;
@@ -137,9 +151,12 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
 void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -160,9 +177,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
     const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -181,9 +201,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -203,9 +226,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
     const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -223,9 +249,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
  */
 void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
@@ -243,9 +272,12 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
  */
 void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
 {
-    if (!mImpl->isInitialized())
-    {
-        return;
+    { // scope for the lock
+        boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+        if (!mImpl->isInitialized())
+        {
+            return;
+        }
     }
 
     try
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index b50b711..2e28b94 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -13,20 +13,26 @@
  * the GNU General Public License Version 2. See the LICENSE.txt file
  * at the top of the source tree.
  */
+#include <boost/shared_ptr.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/shared_mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
 
 #include "SessionRouter.h"
 #include "EndpointRegistry.h"
 #include "RoutingIf.h"
 #include "EndpointIf.h"
 #include "logger.h"
+#include "WorkQueue.h"
 
+using namespace AsteriskSCF;
 using namespace AsteriskSCF::Core::Routing::V1;
 using namespace AsteriskSCF::Core::Endpoint::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Core::Routing::V1::Event;
+using namespace AsteriskSCF::Core::Routing::V1;
 using namespace std;
 
 namespace
@@ -101,7 +107,8 @@ private:
     size_t mMaxRetries;
     size_t mRetryIntervalMilliseconds;
     size_t mCounter;
-};
+
+}; // class RetryPolicy
 
 /**
  * Listener used to monitor sessions during the routing process. Primarily used to 
@@ -270,18 +277,18 @@ private:
     SessionSeq mSessions;
     bool mTerminated;
     SessionListenerPrx mListenerPrx;
-};
+
+}; // class SessionListenerImpl
+
 typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
 
 /**
- * This class uses RAII to manage the lifecycle of a session listener.
- * It's sort of a smart pointer for the listener, but it's tightly
- * coupled to the specifics of our private impl.
+ * This class manages the lifecycle of a session listener.
  */
-class SessionListenerAllocator
+class SessionListenerManager
 {
 public:
-    SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+    SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
         : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
@@ -306,7 +313,7 @@ public:
         }
     }
 
-    SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
+    SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
         : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
@@ -334,7 +341,7 @@ public:
         }
     }
 
-    ~SessionListenerAllocator()
+    ~SessionListenerManager()
     {
         // Our private SessionListener implementation adds itself as a servant. It
         // can't really undo that without getting itself deleted. So undo it
@@ -353,7 +360,7 @@ public:
         try
         {
             // Only the adapter holds a smart pointer for this servant, so this will
-            // cause it to be delted.
+            // cause it to be deleted.
             lg(Debug) << "Removing listener from object adapter." ;
             mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
         }
@@ -363,7 +370,7 @@ public:
         }
     }
 
-    SessionListenerImpl* operator->()
+    SessionListenerImpl* getListener() const
     {
         return mSessionListener;
     }
@@ -371,65 +378,191 @@ public:
 private:
     SessionListenerImpl *mSessionListener;
     Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
+
+/**
+ * Context required by all of the SessionRouter operations. 
+ *  All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required). 
+ */
+struct SessionContext
+{
+public:
+    /**
+     * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
+     * due to component initialization sequence. 
+     */
+    SessionContext(const Ice::ObjectAdapterPtr& adapter,
+                            const EndpointRegistryPtr& registry,
+                            const RoutingEventsPtr& publisher,
+                            const boost::shared_ptr<WorkQueue>& workQueue)
+                                  :  adapter(adapter),
+                                     endpointRegistry(registry),
+                                     eventPublisher(publisher),
+                                     workQueue(workQueue)
+    {
+    }
+
+    Ice::ObjectAdapterPtr adapter;
+    EndpointRegistryPtr endpointRegistry;
+    RoutingEventsPtr eventPublisher;
+    boost::shared_ptr<WorkQueue> workQueue;
+    AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
+};
+
+/** 
+ * An interface for an object that manages a collection of SessionRouterOperations. 
+ */
+class OperationsManager
+{
+public:
+    /**
+     * Operations indicate they are finished by calling this method.
+     */
+    virtual void finished(WorkQueue::Work *) = 0;
+
+    /**
+     * Operations can reschedule themselves on the WorkQueue. 
+     */
+    virtual void reschedule(WorkQueue::Work *) = 0;
+
+protected:
+    OperationsManager() {}  // Can't construct directly. This class is an interface. 
 };
 
+// Forward-declaration
+template <typename T> class LookupCallback;
+
 /**
- * Private operations and state of the SessionRouter. 
+ * This is a base class for worker objects that offload SessionRouter operations 
+ * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
+ * interface so that it can be enqueued to a worker thread or thread pool. 
+ * The template parameter T is the type of the AMD callback type for the 
+ * particular operation. 
  */
-class SessionRouterPriv
+template<typename T>
+class SessionRouterOperation : public WorkQueue::Work
 {
 public:
-    SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
-        const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
-        mAdapter(objectAdapter),
-        mEndpointRegistry(endpointRegistry),
-        mEventPublisher(eventPublisher)
+    /**
+     * Constructor. 
+     *  @param amdCallback The callback object to provide results to the initiator of this operation.
+     *  @param context The SessionContext provides references to key objects needed by each operation. 
+     *  @param manager 
+     */
+    SessionRouterOperation(const T& amdCallback,
+                           const SessionContext& context,
+                           OperationsManager* manager,
+                           const boost::function<void ()> &initialStateHandler) 
+       : mInitiatorCallback(amdCallback),
+         mSessionContext(context),
+         mFinished(false),
+         mOperationsManager(manager),
+         mCurrentStateHandler(initialStateHandler)
     {
     }
 
-    ~SessionRouterPriv()
+    virtual ~SessionRouterOperation() 
+    {
+    }
+
+    /** 
+     * An implementation of the WorkQueue::Work interface. 
+     */
+    virtual void doWork()
     {
+        mCurrentStateHandler();
     }
 
     /**
-     * Set the accessor to the bridge.
+     * Inform initiator that this operation finished with 
+     * the specified exception. 
      */
-    void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
-            SessionCommunications::V1::BridgeManagerPrx>&  bridgeAccessor)
+    void finishWithException(const ::std::exception& e)
     {
-        mBridgeManager = bridgeAccessor;
+        // Forward to this operation's initiator.
+        mInitiatorCallback->ice_exception(e);
+        finish();
     }
 
     /**
-     * Do a lookup of the requested endpoint.
+     * Inform initiator that this operation finished with 
+     * an unspecified exception. 
      */
-    EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
+    void finishWithException()
     {
-        EndpointSeq endpoints;
-        try
-        {
-            // Lookup the destination.
-            endpoints = mEndpointRegistry->lookup(destination, current);
+        mInitiatorCallback->ice_exception();
+        finish();
+    }
 
-            if (endpoints.empty())
-            {
-                throw DestinationNotFoundException(destination);
-            }
-        }
-        catch (const DestinationNotFoundException&)
+    /**
+     * Inform initiator that this operation finished with 
+     * an unspecified exception. 
+     */
+    void finishAndSendResult()
+    {
+        mInitiatorCallback->ice_response();
+        finish();
+    }
+    
+    /**
+     * This operation is called via a LookupCallback as a result of AMI calls 
+     * to the SessionManagers' EndpointLocators. 
+     */
+    void setLookupResult(const EndpointSeq& endpoints)
+    {
+        mLookupResult = endpoints;
+
+        // Reschedule this operation to complete.
+        try
         {
-            // rethrow
-            throw;
+            mOperationsManager->reschedule(this);
         }
-        catch (const Ice::Exception &)
+        catch(const Ice::Exception& e)
         {
-            // Probably couldn't access the EndpointLocator of the registered channel.
-            throw EndpointUnreachableException(destination);
+            finishWithException(e);
         }
+    }
+
+protected: // These protected operations are utiltity functions. 
+
+    /**
+     * Common completion code. 
+     */
+    void finish()
+    {
+        // Mark internal state as finished. 
+        mFinished = true;
 
-        return endpoints;
+        // Inform our container that we are complete. 
+        mOperationsManager->finished(this);
     }
 
+    /**
+     * Initiate a lookup of the requested endpoint.
+     *  @param destination Destination to be looked up.
+     *  @param current The Ice::Current reference. 
+     */
+    void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
+    {
+        try
+        {
+            // This component's own lookup interface is implemented as AMD. 
+            // We provide our override of the appropriate AMD callback. 
+            AMD_EndpointLocator_lookupPtr lookupCallback;
+
+            lookupCallback = new LookupCallback<T>(this);
+
+            // Lookup the destination.
+            mSessionContext.endpointRegistry->lookup_async(lookupCallback, destination, current);
+        }
+        catch (...)
+        {
+            finishWithException();
+        }
+    }
 
     /**
      * Forward the start() operation to all sessions in a given sequence. 
@@ -445,8 +578,7 @@ public:
             catch (const Ice::Exception &e)
             {
                 lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
-                // TBD... probably other bridge cleanup needs to be done.
-                throw;
+                finishWithException(e);
             }
         }
     }
@@ -456,7 +588,7 @@ public:
      */
     BridgePrx getBridge(SessionPrx session)
     {
-        BridgePrx result(0);
+        BridgePrx result;
 
         RetryPolicy policy(5, 500);
         while(policy.canRetry())
@@ -466,23 +598,26 @@ public:
                 result = session->getBridge();
                 break;
             }
-            catch(const Ice::ConnectionLostException&)
+            catch(const Ice::ConnectionLostException& cle)
             {
                 if(!policy.retry())
                 {
                     lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed "  << policy.maxRetries() << " retries." ;
-                    throw;
+                    finishWithException(cle);
+                    throw cle; 
                 }
             }
             catch(const NotBridged& e)
             {
                 lg(Error) << "getBridge(): session is not bridged." ;
-                throw e; // rethrow
+                finishWithException(e); 
+                throw e;
             }
             catch(const Ice::Exception& e)
             {
                 lg(Error) << "getBridge(): Ice exception getting bridge for session:"  << e.what();
-                throw e; // rethrow
+                finishWithException(e); 
+                throw e;
             }
         }
 
@@ -493,7 +628,7 @@ public:
      * Create a session to each of a given set of endpoints, and return a collection of the 
      * newly added sessions. 
      */
-    SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+    SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination)
     {
         // Add a session
         SessionSeq newSessions;
@@ -505,21 +640,22 @@ public:
 
                 // Create a session on the destination.
                 lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
-                SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+                SessionPrx destSession = sessionEndpoint->createSession(destination, mListenerManager->getListener()->getProxy());
                 if(!destSession)
                 {
                     lg(Debug) << " Session endpoint returned a null proxy, continuing with other endpoints";
                     continue;
-                }
+		}
+
                 lg(Debug) << "  Session proxy: " << destSession->ice_toString() ;
 
-                listener->addSession(destSession);
+                mListenerManager->getListener()->addSession(destSession);
                 newSessions.push_back(destSession);
             }
             catch(const Ice::Exception &exception)
             {
                 lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
-                // We may be able to reach SOME of the endpoints.
+                // We may be able to reach SOME of the endpoints. 
             }
         }
         return newSessions;
@@ -528,20 +664,35 @@ public:
     /**
      * Accessor for the sessions in a bridge.
      *   @bridge The bridge whose sessions are to be accessed.
-     *   @except An optional session proxy to be excluded from the list of sessions. 
      */
-    SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
+    SessionSeq getSessionsInBridge(const BridgePrx& bridge)
+    {
+        SessionSeq allSessions;
+        try
+        {
+            allSessions = bridge->listSessions();
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
+            finishWithException(e); 
+
+        }
+        return allSessions;
+    }
+
+    /**
+     * Accessor for the sessions in a bridge.
+     *   @bridge The bridge whose sessions are to be accessed.
+     *   @except Session proxy to be excluded from the list of sessions. 
+     */
+    SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except)
     {
         SessionSeq sessions; 
         try
         {
             SessionSeq allSessions = bridge->listSessions();
 
-            if (except == 0)
-            {
-                return allSessions;
-            }
-
             for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
             {
                 if (except->ice_getIdentity() != (*s)->ice_getIdentity())
@@ -553,7 +704,7 @@ public:
         catch(const Ice::Exception &e)
         {
             lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
-            throw e; // rethrow
+            finishWithException(e); 
         }
         return sessions;
     }
@@ -588,225 +739,644 @@ public:
         return removedSessions;
     }
 
-public:
-    Ice::ObjectAdapterPtr mAdapter;
-    EndpointRegistryPtr mEndpointRegistry;
-    RoutingEventsPtr mEventPublisher;
-    AsteriskSCF::SmartProxy::SmartProxy<
-        SessionCommunications::V1::BridgeManagerPrx> mBridgeManager;
-};
+    void setState(const boost::function<void ()>& stateHandler, std::string stateName)
+    {
+        lg(Debug) << "Operation setting new state handler " << stateName;
+        mCurrentStateHandler = stateHandler; 
+    }
 
-SessionRouter::SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
-    const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
-    mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher))
-{
-}
+protected:
+    T mInitiatorCallback;
+    SessionContext mSessionContext;
+    WorkQueue::PoolId mPoolId;
 
-SessionRouter::~SessionRouter()
-{
-    mImpl.reset();
-}
+    bool mFinished;
+    EndpointSeq mLookupResult;
+    SessionListenerManagerPtr mListenerManager;
+    OperationsManager* mOperationsManager;
 
-void SessionRouter::setBridgeManager(
-    const AsteriskSCF::SmartProxy::SmartProxy<
-        SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
-{
-    mImpl->mBridgeManager = bridgeAccessor;
-}
+private:
+    boost::function<void ()> mCurrentStateHandler;     // Lightweight state machine. Current state handles doWork() for a given state.
+
+}; // class SessionRouterOperation
 
 /**
- * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
- *   TBD - Need to rework with asynch support.
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread. 
  */
-void SessionRouter::routeSession(
-    const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
-    const std::string& destination,
-    const Ice::Current& current)
+class  RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
 {
-    lg(Debug) << "routeSession() entered with destination " << destination ;
+public:
+    RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener) 
+         : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>(cb, 
+                                  context, 
+                                  listener, 
+                                  boost::bind(&RouteSessionOperation::lookupState, this)),
+           mInitiatorCallback(cb),
+           mSource(source),
+           mDestination(destination),
+           mIceCurrent(current)
+    {
+    }
 
-    if (!mImpl->mBridgeManager.initializeOnce())
+    virtual ~RouteSessionOperation() 
     {
-        lg(Error) << "No proxy to BridgeManager.  "
-            "Make sure all services are running.";
-        throw BridgingException(source->getEndpoint()->getId(), destination);
+         lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
     }
 
-    // Create a listener for the source to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    SessionListenerAllocator listener(mImpl->mAdapter, source);
+private:
+    /**
+     * We start routing the session by looking up the endpoint of the destination. 
+     *
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void lookupState()
+    {
+        lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+        if (!mSessionContext.bridgeManager.initializeOnce())
+        {
+            lg(Error) << "No proxy to BridgeManager.  "
+                "Make sure all services are running.";
 
-    // Route the destination
-    lg(Debug) << "routeSession(): Routing destination " << destination;
-    EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+            finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+            return;
+        }
 
-    // Add a session to the endpoints.
-    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+        // Create a listener for the source to handle early termination.
+        // The wrapper we're using will remove the listener and free it when
+        // this method is left.
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+        mListenerManager = listener;
 
-    if (listener->getNumSessions() < 2)
+        // Set the state handler to exectute once we've looked up our endpoints. 
+        setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+        // Lookup the destination. This will use AMI, and the callback will 
+        // schedule us to execute again. 
+        lg(Debug) << "routeSession(): Routing destination " << mDestination;
+        lookupEndpoints(mDestination, mIceCurrent);
+    }
+
+    /**
+     * Entering this state, the destination endpoint has been obtained. This state
+     * completes the operation by creating the bridge. 
+     * 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
+     */
+    void establishBridgeState()
+    {
+        if (mFinished)
+        {
+            return;
+        }
+
+        assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+        if (mLookupResult.size() < 1)
+        {
+            finishWithException(DestinationNotFoundException(mDestination));
+            return;
+        }
+
+        // Add a session to the endpoints.
+        SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
+
+        if (mListenerManager->getListener()->getNumSessions() < 2)
+        {
+            finishWithException(SessionCreationException(mDestination));
+            return;
+        }
+
+        if (mListenerManager->getListener()->isTerminated())
+        {
+            finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the
+        // Bridge's functionality if we keep listening.
+        mListenerManager->getListener()->unregister();
+
+        // Create the bridge
+        BridgePrx bridge;
+        try
+        {
+            SessionSeq bridgedSessions;
+            bridgedSessions.push_back(mSource);
+
+            bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+            bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+            lg(Debug) << "routeSession(): Creating bridge.";
+            bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+        }
+        catch (const Ice::Exception &e)
+        {
+            lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+            finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+            return;
+        }
+
+        // Forward the start to all the destinations routed to.
+        lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+        forwardStart(newSessions);
+
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
+    }
+
+private:
+    // Operation input params. 
+    AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+    SessionPrx mSource;
+    string mDestination;
+    ::Ice::Current mIceCurrent;
+    
+}; // class RouteSessionOperation
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the 
+ * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type 
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+ class  ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+    ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener)
+         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb, 
+                                  context, 
+                                  listener, 
+                                  boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+           mInitiatorCallback(cb),
+           mSessionToReplace(sessionToReplace),
+           mDestination(destination),
+           mIceCurrent(current)
     {
-        throw SessionCreationException(destination);
     }
 
-    if (listener->isTerminated())
+    virtual ~ConnectBridgedSessionsWithDestinationOperation() 
     {
-        throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
+         lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
     }
 
-    // We're through listening, and we will probably interfere with the
-    // Bridge's functionality if we keep listening.
-    listener->unregister();
+private:
 
-    // Create the bridge
-    BridgePrx bridge;
-    try
+    void lookupState()
     {
-        SessionSeq bridgedSessions;
-        bridgedSessions.push_back(source);
+        lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+        mBridge = mSessionToReplace->getBridge();
+
+        mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+
+        // Create a listener for the sessions not being replaced to handle early termination.
+        // The wrapper we're using will remove the listener and free it when
+        // this method is left.
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+        mListenerManager = listener;
 
-        bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
-        bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+        // Route the destination
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
 
-        lg(Debug) << "routeSession(): Creating bridge.";
-        bridge = mImpl->mBridgeManager->createBridge(bridgedSessions, 0);
+        // Set the state to exectute after lookup. 
+        setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+       // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+
+        // Lookup the destination. This will use AMI, and the callback should 
+        // schedule us to execute again. 
+        lg(Debug) << "routeSession(): Routing destination " << mDestination;
+        lookupEndpoints(mDestination, mIceCurrent);
     }
-    catch (const Ice::Exception &e)
+
+    /**
+     * Entering this state, the destination endpoint has been obtained. 
+     * This operation is invoked from the worker thread, having been scheduled by
+     * the callback from lookup. 
+     */
+    void establishBridgeState()
     {
-        lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
-        listener->unregister();
+        if (mFinished)
+        {
+            return;
+        }
+
+        assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+        if (mLookupResult.size() < 1)
+        {
+            finishWithException(DestinationNotFoundException(mDestination));
+            return;
+        }
+
+        // Add a session 
+        SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
+
+        if (mListenerManager->getListener()->getNumSessions() < 2)
+        {
+            lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+            finishWithException(SessionCreationException(mDestination));
+            return;
+        }
+
+        if (mListenerManager->getListener()->isTerminated())
+        {
+            lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+            finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the Bridge's functionality if
+        // we keep listening.
+        mListenerManager->getListener()->unregister();
+
+        // Modify the bridge
+        try
+        {
+            lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+            mBridge->replaceSession(mSessionToReplace, newSessions);
+        }
+        catch (const Ice::Exception &e)
+        {
+            lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+            finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+            return;
+        }
+
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+        forwardStart(newSessions);
 
-        throw BridgingException(source->getEndpoint()->getId(), destination);
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
     }
 
+private:
+    // Operation input params. 
+    AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+    SessionPrx mSessionToReplace;
+    string mDestination;
+    ::Ice::Current mIceCurrent;
 
-    // Forward the start to all the destinations routed to.
-    lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
-    mImpl->forwardStart(newSessions);
+    // Implementation state
+    BridgePrx mBridge;
+    SessionSeq mRemainingSessions;
 
-} // SessionRouter::routeSession(...)
+}; // class ConnectBridgedSessionsWithDestinationOperation
 
 /**
- * Replace one session in a Bridge with a new
- * session routable by the destination param.
- *   @param source The session initiating the routing event.
- *   @param destination The address or id of the destination to be routed.
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of 
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
  */
-void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sessionToReplace,
-    const ::std::string& destination,
-    const Ice::Current& current)
+class  ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
 {
-    lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
+public:
+    ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener)
+         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>(cb, 
+                                             context, 
+                                             listener, 
+                                             boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+           mInitiatorCallback(cb),
+           mSessionToReplace(sessionToReplace),
+           mBridgedSession(bridgedSession),
+           mIceCurrent(current)
+    {
+    }
+
+    virtual ~ConnectBridgedSessionsOperation() 
+    {
+         lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+    }
+
+private:
+    /**
+     * Replace one session in a Bridge with sessions from another bridge.
+     */
+    void connectBridgedSessionsState()
+    {
+        lg(Debug) << "connectBridgedSessions() entered... ";
 
-    BridgePrx bridge(sessionToReplace->getBridge());
+        // Get the bridge being merged into.
+        BridgePrx mergeBridge = getBridge(mSessionToReplace);
 
-    SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
+        SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
 
-    // Create a listener for the sessions not being replaced to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
-    SessionListenerAllocator listener(mImpl->mAdapter, remainingSessions);
+        // Create a listener for the sessions not being replaced to handle early termination.
+        lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+        mListenerManager = listener;
 
-    // Route the destination
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
-    EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+        // Get the bridge for the sessions being moved.
+        BridgePrx oldBridge = getBridge(mBridgedSession);
 
-    // Add a session 
-    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+        SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
 
-    if (listener->getNumSessions() < 2)
+        // Check for early termination by the source.
+        if (mListenerManager->getListener()->isTerminated())
+        {
+            lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+            finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the Bridge's functionality if
+        // we keep listening.
+        lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
+        mListenerManager->getListener()->unregister();
+
+        // Now replace the sessions.
+        try
+        {
+            lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+            mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+        }
+        catch(const Ice::Exception& e)
+        {
+            lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+            finishWithException(e); // rethrow
+            return;
+        }
+
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
+    }
+
+private:
+    // Operation input params. 
+    AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+    SessionPrx mSessionToReplace;
+    SessionPrx mBridgedSession;
+    ::Ice::Current mIceCurrent;
+
+}; // class ConnectBridgedSessionsOperation
+
+
+/** 
+ * An implementation of the AMD_EndpointLocator_lookup callback so 
+ * that we can call our own lookup operation. 
+ * Note that we're not really dispatching via AMD, but we're using the same
+ * AMD implementation that other components would use to do a lookup(). 
+ */
+template <typename T>
+class LookupCallback : public AMD_EndpointLocator_lookup
+{
+public:
+    LookupCallback(SessionRouterOperation<T>* operation)
+        : mOperation(operation)
     {
-        lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << destination << " in connectBridgedSessionsWithDestination(). " ;
-        throw SessionCreationException(destination);
     }
 
-    if (listener->isTerminated())
+    ~LookupCallback()
     {
-        lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
-        throw SourceTerminatedPreBridgingException(remainingSessions[0]->getEndpoint()->getId());
+        lg(Debug) << "LookupCallback destroyed."; 
     }
-    // We're through listening, and we will probably interfere with the Bridge's functionality if
-    // we keep listening.
-    listener->unregister();
 
-    // Modify the bridge
-    try
+public: // Overrides. 
+
+    virtual void ice_exception(const ::std::exception& e)
     {
-        lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << destination;
-        bridge->replaceSession(sessionToReplace, newSessions);
+        mOperation->finishWithException(e);
     }
-    catch (const Ice::Exception &e)
+
+    virtual void ice_exception()
     {
-        lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
-        throw BridgingException(remainingSessions[0]->getEndpoint()->getId(), destination);
+        mOperation->finishWithException();
     }
 
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
-    mImpl->forwardStart(newSessions);
+    virtual void ice_response(const ::AsteriskSCF::Core::Endpoint::V1::EndpointSeq& endpoints)
+    {
+        mOperation->setLookupResult(endpoints);
+    }
 
-} // SessionRouter::connectBridgedSessionsWithDestination(...)
+private:
+    SessionRouterOperation<T>* mOperation;
+};
 
+typedef map<WorkQueue::Work*, boost::shared_ptr<WorkQueue::Work> > OperationMap;
 
 /**
- * Replace one session in a Bridge with sessions from another bridge.
- * No routing is actually performed. This operation exists here for consistency,
- * since connectBridgedSessionsWithDestination(...) is implemented by this interface.
- * @param sessionToReplace The session that is to be replaced in a
- *   bridge. The bridge obejct associated with this session will survive, and
- *   all sessions bridged to this session will be kept in the bridge.
- * @param bridgedSession This session is assumed to be bridged to the sessions
- *   that are to be moved to another bridge. The bridgedSession itself will not
- *   be connected to the other bridge. The sessions being moved will be removed from
- *   their current bridge before being added to the bridge currenltly attached to
- *   sessionToReplace.
+ * Private operations and state of the SessionRouter. 
  */
-void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
-    const SessionPrx& bridgedSession,
-    const Ice::Current&)
+class SessionRouterPriv : public OperationsManager
 {
-    lg(Debug) << "connectBridgedSessions() entered... ";
+public:
+    SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, 
+                      const EndpointRegistryPtr& endpointRegistry,
+                      const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                      const boost::shared_ptr<WorkQueue>& workQueue) :
+                         mSessionContext(objectAdapter,
+                                  endpointRegistry,
+                                  eventPublisher,
+                                  workQueue)
+    {
+    }
 
-    // Get the bridge being merged into.
-    BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
+    ~SessionRouterPriv()
+    {
+    }
 
-    SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
+    /**
+     * Set the accessor to the bridge. 
+     */
+    void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
+            SessionCommunications::V1::BridgeManagerPrx>&  bridgeAccessor)
+    {
+        mSessionContext.bridgeManager = bridgeAccessor;
+    }
 
-    // Create a listener for the sessions not being replaced to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
-    SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
+    /**
+     * Enqueue the work to the WorkQueue. 
+     */
+    void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
+    {
+        // Maintain refs to all ongoing operations. 
+        mOngoingOperations[work.get()] = work;
 
-    // Get the bridge for the sessions being moved.
-    BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
+        // Enqueue work. 
+        mSessionContext.workQueue->enqueue(work);
+    }
 
-    SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
+public: // Overrides
 
-    // Check for early termination by the source.
-    if (listener->isTerminated())
+    /**
+     * Handle a notice from an operation that it has completed. 
+     * Remove our shared_ptr reference so that it will die. 
+     */
+    virtual void finished(WorkQueue::Work* op)
     {
-        lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
-        throw SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId());
+        boost::lock_guard<boost::mutex> guard(mLock);
+        OperationMap::iterator kvp = mOngoingOperations.find(op);
+
+        if (kvp != mOngoingOperations.end())
+        {
+            lg(Debug) << "Removing reference to finished opeation.";
+            mOngoingOperations.erase(kvp);
+        }
     }
-    // We're through listening, and we will probably interfere with the Bridge's functionality if
-    // we keep listening.
-    lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
-    listener->unregister();
 
-    // Now replace the sessions.
-    try
+    /**
+     * Handle a an operation's need to reschedule itself.  
+     * The operation doesn't have a shared_ptr to itself, so
+     * it can't do it internally. 
+     */
+    virtual void reschedule(WorkQueue::Work *op)
     {
-        lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
-        mergeBridge->replaceSession(sessionToReplace, migratingSessions);
+        mSessionContext.workQueue->enqueue(getOngoingOperationSharedPointer(op));
     }
-    catch(const Ice::Exception& e)
+
+private:
+    /** 
+     * Find our shared_ptr for a given Work object raw pointer. 
+     */
+    boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
     {
-        lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
-        throw e; // rethrow
+        boost::lock_guard<boost::mutex> guard(mLock);
+        OperationMap::iterator kvp = mOngoingOperations.find(operation);
+
+        assert(kvp != mOngoingOperations.end());
+        if (kvp == mOngoingOperations.end())
+        {
+            throw Ice::UnknownException("SessionRouterPriv: Failed finding shared_ptr for SessionRouter operation.", 1);
+        }
+
+        return (*kvp).second;
     }
 
-} // SessionRouter::connectBridgedSessions(...)
+public:
+    SessionContext mSessionContext;
+    OperationMap mOngoingOperations;
+    boost::mutex mLock;
+};
+
+/**
+ * The SessionRouter implementation. 
+ */
+SessionRouter::SessionRouter(
+                  const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
+                  const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                  const boost::shared_ptr<WorkQueue>& workQueue) 
+            : mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher, workQueue))
+{
+}
+
+SessionRouter::~SessionRouter()
+{
+    mImpl.reset();
+}
+
+/** 
+ * The BridgeManager proxy can only be obtained once our object adapter is activated, so our
+ * bridgeManager reference's initialization is deferred. 
+ */
+void SessionRouter::setBridgeManager(
+    const AsteriskSCF::SmartProxy::SmartProxy<
+        SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+{
+    mImpl->mSessionContext.bridgeManager = bridgeAccessor;
+}
+
+/**
+ * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
+ */
+void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                                       const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                       const ::std::string& destination, 
+                                       const ::Ice::Current& current)
+{
+    WorkPtr routeSessionOp(new RouteSessionOperation(cb, 
+                                                   source, 
+                                                   destination, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
+    
+
+    mImpl->scheduleOperation(routeSessionOp);
+}
+
+/**
+ * Replace one session in a Bridge with a new
+ * session routable by the destination param.
+ *   @param source The session initiating the routing event.
+ *   @param destination The address or id of the destination to be routed.
+ */
+void SessionRouter::connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb, 
+                                                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                                const ::std::string& destination, 
+                                                                const ::Ice::Current& current)
+{
+    WorkPtr connectBridgedSessionsWithDestinationOp(new ConnectBridgedSessionsWithDestinationOperation(cb, 
+                                                   sessionToReplace, 
+                                                   destination, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
+
+    mImpl->scheduleOperation(connectBridgedSessionsWithDestinationOp);
+
+}
+
+/**
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency,
+ * since connectBridgedSessionsWithDestination(...) is implemented by this interface.
+ * @param sessionToReplace The session that is to be replaced in a
+ *   bridge. The bridge obejct associated with this session will survive, and
+ *   all sessions bridged to this session will be kept in the bridge.
+ * @param bridgedSession This session is assumed to be bridged to the sessions
+ *   that are to be moved to another bridge. The bridgedSession itself will not
+ *   be connected to the other bridge. The sessions being moved will be removed from
+ *   their current bridge before being added to the bridge currenltly attached to
+ *   sessionToReplace.
+ */
+
+void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb, 
+                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,  
+                                                 const ::Ice::Current& current)
+{
+    WorkPtr connectBridgedSessionsOp(new ConnectBridgedSessionsOperation(cb, 
+                                                   sessionToReplace, 
+                                                   bridgedSession, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
+
+    mImpl->scheduleOperation(connectBridgedSessionsOp);
+
+}
+
 
 } // end BasicRoutingService
 } // end AsteriskSCF
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index d955519..014f3ce 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -21,6 +21,7 @@
 #include "SmartProxy.h"
 #include "SessionCommunications/SessionCommunicationsIf.h"
 #include "EndpointRegistry.h"
+#include "WorkQueue.h"
 
 namespace AsteriskSCF
 {
@@ -34,8 +35,10 @@ class SessionRouterPriv;
 class SessionRouter : public AsteriskSCF::SessionCommunications::V1::SessionRouter
 {
 public:
-    SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,  const EndpointRegistryPtr& endpointRegistry,
-        const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher);
+    SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,  
+                  const EndpointRegistryPtr& endpointRegistry,
+                  const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                  const boost::shared_ptr<WorkQueue>& sessionRouterWorkQueue);
     ~SessionRouter();
 
     void setBridgeManager(
@@ -55,6 +58,11 @@ public:
         const std::string& destination,
         const Ice::Current&);
 
+    virtual void routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                                    const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                    const ::std::string& destination, 
+                                    const ::Ice::Current&);
+
     /**
      * Replace a session in a bridge with a destination. The desintation will be routed.
      *   @param sessionToReplace The session to be replaced in a bridge. The affected Bridge interface is
@@ -65,6 +73,11 @@ public:
         const ::std::string& destination,
         const Ice::Current&);
 
+    virtual void connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb, 
+                                                             const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                             const ::std::string& destination, 
+                                                             const ::Ice::Current& );
+
     /**
      * Replace a session in a bridge with another session. If the newSession is already participating in a Bridge,
      * it will be removed from it's current bridge prior to be used as a replacement.
@@ -76,6 +89,11 @@ public:
         const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
         const Ice::Current&);
 
+    virtual void connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb, 
+                                              const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                              const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,  
+                                              const ::Ice::Current&);
+
 private:
     boost::shared_ptr<SessionRouterPriv> mImpl;
 };
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
new file mode 100644
index 0000000..fc33deb
--- /dev/null
+++ b/src/SimpleWorkQueue.cpp
@@ -0,0 +1,275 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+/**
+ * A simple Work Queue implementation. On construction, starts an internal thread. 
+ * Work can be enqueued via the thread-safe enqueue() method. All work must implement 
+ * the Work interface.
+ */
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+#include <list>
+
+#include "logger.h"
+
+#include "SimpleWorkQueue.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::System::Logging;
+using namespace boost;
+
+namespace AsteriskSCF
+{
+class SimpleWorkQueuePriv
+{
+public:
+    SimpleWorkQueuePriv(const std::string& id, const Logger& logger) 
+         : mLogger(logger),
+           mQid(id),
+           mInitialized(false), 
+           mFinished(false), 
+           mPaused(false),      // runs by default. 
+           mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
+           
+    {
+        mLogger(Debug) << "SimpleWorkQueue::private_impl constructor called. Queue ID:" << mQid;
+    }
+
+    ~SimpleWorkQueuePriv()
+    {
+        mLogger(Debug) << "SimpleWorkQueue::private_impl desctuctor called. Queue ID:" << mQid;
+    }
+
+    WorkPtr dequeue();
+    WorkPtr waitAndDequeue();
+    void execute();
+    bool isPaused();
+
+    const Logger& mLogger;
+    std::string mQid;
+    bool mInitialized;
+    bool mFinished;
+    bool mPaused;
+    boost::thread mThread;
+    std::list<WorkPtr> mQueue;
+    boost::mutex mQueueMutex;
+    boost::condition mEmptyQueueCondition;
+    boost::mutex mPauseMutex;
+    boost::condition mPauseCondition;
+};
+}
+
+SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
+{
+    mImpl->mLogger(Debug) << "SimpleWorkQueue::Constructor() called. Queue ID:" << mImpl->mQid;
+    mImpl->mInitialized = true; 
+}
+
+SimpleWorkQueue::~SimpleWorkQueue()
+{
+    mImpl->mLogger(Debug) << "SimpleWorkQueue::Destructor() called. Queue ID:" << mImpl->mQid;
+    terminate();
+
+    // Wait for worker thread to shut down. 
+    mImpl->mThread.join(); // If you don't do this, then the mImpl is trashed and Execute has bad "this" ptr on other thread. 
+}
+
+bool SimpleWorkQueue::isRunning() 
+{ 
+    return (mImpl->mInitialized && !mImpl->mPaused && !mImpl->mFinished);
+}
+
+/**
+ * Pause the SimpleWorkQueue's thread.
+ */
+void SimpleWorkQueue::pause()
+{
+    mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
+
+    boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+    mImpl->mPaused = true;
+}
+
+/**
+ * Resume from a Paused state. 
+ */
+void SimpleWorkQueue::resume()
+{
+    mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
+
+    boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+    mImpl->mPaused = false;
+    mImpl->mPauseCondition.notify_all();
+}
+
+/** 
+ * Stops this thread from executing. 
+ */
+void SimpleWorkQueue::terminate()
+{
+    mImpl->mLogger(Info) << "SimpleWorkQueue::Terminate called for queue " << mImpl->mQid ;
+
+    mImpl->mFinished = true;
+    mImpl->mPaused = false;
... 350 lines suppressed ...


-- 
team/ken.hunt/route_async_routing.git



More information about the asterisk-scf-commits mailing list