[asterisk-scf-commits] asterisk-scf/release/routing.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri May 6 10:20:23 CDT 2011


branch "master" has been updated
       via  c41eaab05360240a5a46ce507f00fb0891ff5e23 (commit)
       via  88d1d904f5d76d1185f5f0aff98acca062fa47f9 (commit)
       via  d74afeba80c8b862e7021b8378bf38f92565d317 (commit)
      from  ba4ae430a0457506f51fad79931af15a0f921832 (commit)

Summary of changes:
 src/BasicRoutingServiceApp.cpp       |   12 +-
 src/CMakeLists.txt                   |    3 +
 src/EndpointRegistry.cpp             |  250 ++++++++---
 src/EndpointRegistry.h               |    2 +-
 src/RoutingServiceEventPublisher.cpp |   15 +
 src/SessionRouter.cpp                |  870 ++++++++++++++++++++++++++--------
 src/SessionRouter.h                  |   22 +-
 src/SimpleWorkQueue.cpp              |  273 +++++++++++
 src/SimpleWorkQueue.h                |   58 +++
 src/WorkQueue.h                      |   85 ++++
 test/MockEndpointLocator.cpp         |   14 +-
 test/MockEndpointLocator.h           |    2 +-
 12 files changed, 1334 insertions(+), 272 deletions(-)
 create mode 100644 src/SimpleWorkQueue.cpp
 create mode 100644 src/SimpleWorkQueue.h
 create mode 100644 src/WorkQueue.h


- Log -----------------------------------------------------------------
commit c41eaab05360240a5a46ce507f00fb0891ff5e23
Merge: 88d1d90 ba4ae43
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu May 5 09:59:15 2011 -0500

    Merge branch 'master' of gitdepot:asterisk-scf/release/routing
    
    Conflicts:
    	src/CMakeLists.txt

diff --cc src/CMakeLists.txt
index c55afbc,7b25cb3..54a2557
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@@ -14,21 -20,8 +20,11 @@@ asterisk_scf_component_add_file(BasicRo
  asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
  asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
  asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
 +asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
 +asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
 +asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
- 
  asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
  asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
- 
- if(NOT logger_dir)
-    message(FATAL_ERROR "The logger directory could not be found ${logger_dir}")
- endif()
- include_directories(${logger_dir}/include)
- include_directories(${API_INCLUDE_DIR})
- 
- include_directories(${utils_dir}/SmartProxy/include)
- 
  asterisk_scf_component_build_icebox(BasicRoutingService)
  target_link_libraries(BasicRoutingService ${LUA_LIBRARIES})
  target_link_libraries(BasicRoutingService logging-client)

commit 88d1d904f5d76d1185f5f0aff98acca062fa47f9
Merge: d74afeb 5b9b447
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Apr 25 18:18:45 2011 -0500

    Merge branch 'master' of gitdepot:asterisk-scf/release/routing
    
    Conflicts:
    	src/BasicRoutingServiceApp.cpp
    	src/RoutingServiceEventPublisher.cpp
    	src/SessionRouter.cpp

diff --cc src/EndpointRegistry.cpp
index 79b96b6,362d625..33caac4
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@@ -20,9 -22,7 +22,8 @@@
  #include "RoutingServiceEventPublisher.h"
  #include "EndpointRegistry.h"
  #include "ScriptProcessor.h"
- #include "logger.h"
  
 +using namespace ::AsteriskSCF::Core::Endpoint::V1;
  using namespace ::AsteriskSCF::Core::Routing::V1;
  using namespace ::AsteriskSCF::System::Logging;
  using namespace ::AsteriskSCF::Core::Routing::V1::Event;
diff --cc src/RoutingServiceEventPublisher.cpp
index 73cbc18,fa00a8a..be5761a
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@@ -16,10 -16,9 +16,11 @@@
  #include <Ice/Ice.h>
  #include <IceStorm/IceStorm.h>
  
 +#include <boost/thread/mutex.hpp>
 +
+ #include <AsteriskSCF/logger.h>
+ 
  #include "RoutingServiceEventPublisher.h"
- #include "logger.h"
  
  using namespace ::std;
  using namespace ::AsteriskSCF::Core::Routing::V1;
diff --cc src/SessionRouter.cpp
index b160069,f492f80..99f8ea2
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@@ -13,20 -13,16 +13,20 @@@
   * the GNU General Public License Version 2. See the LICENSE.txt file
   * at the top of the source tree.
   */
 +#include <boost/shared_ptr.hpp>
  #include <boost/thread/thread.hpp>
  #include <boost/thread/shared_mutex.hpp>
 +#include <boost/function.hpp>
 +#include <boost/bind.hpp>
  
+ #include <AsteriskSCF/Core/Routing/RoutingIf.h>
+ #include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
+ #include <AsteriskSCF/logger.h>
+ 
  #include "SessionRouter.h"
  #include "EndpointRegistry.h"
- #include "RoutingIf.h"
- #include "EndpointIf.h"
- #include "logger.h"
- #include "WorkQueue.h"
  
 +using namespace AsteriskSCF;
  using namespace AsteriskSCF::Core::Routing::V1;
  using namespace AsteriskSCF::Core::Endpoint::V1;
  using namespace AsteriskSCF::System::Logging;
@@@ -595,10 -487,15 +576,16 @@@ protected: // These protected operation
  
                  // Create a session on the destination.
                  lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
 -                SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
 +                SessionPrx destSession = sessionEndpoint->createSession(destination, mListenerManager->getListener()->getProxy());
+                 if(!destSession)
+                 {
+                     lg(Debug) << " Session endpoint returned a null proxy, continuing with other endpoints";
+                     continue;
+                 }
++
                  lg(Debug) << "  Session proxy: " << destSession->ice_toString() ;
  
 -                listener->addSession(destSession);
 +                mListenerManager->getListener()->addSession(destSession);
                  newSessions.push_back(destSession);
              }
              catch(const Ice::Exception &exception)
diff --cc src/SessionRouter.h
index 014f3ce,6332292..810f09c
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@@ -18,10 -18,10 +18,11 @@@
  #include <Ice/Ice.h>
  #include <boost/shared_ptr.hpp>
  
- #include "SmartProxy.h"
- #include "SessionCommunications/SessionCommunicationsIf.h"
+ #include <AsteriskSCF/SmartProxy.h>
+ #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+ 
  #include "EndpointRegistry.h"
 +#include "WorkQueue.h"
  
  namespace AsteriskSCF
  {

commit d74afeba80c8b862e7021b8378bf38f92565d317
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Dec 20 21:26:08 2010 -0600

    Supporting asynchronous calls in routing service.

diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 3920cc2..f246a22 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,6 +30,7 @@
 #include "EndpointRegistry.h"
 #include "RoutingAdmin.h"
 #include "SessionRouter.h"
+#include "SimpleWorkQueue.h"
 #include "IceLogger.h"
 #include "logger.h"
 
@@ -56,11 +57,14 @@ namespace BasicRoutingService
 class BasicRoutingServiceApp : public IceBox::Service
 {
 public:
-    BasicRoutingServiceApp() :
-        mDone(false), mInitialized(false), mRunning(false)
+    BasicRoutingServiceApp() 
+        : mDone(false), 
+          mInitialized(false), 
+          mRunning(false),
+          mWorkQueue( new SimpleWorkQueue("SessionRouterWorkQueue", lg))
     {
-
     }
+
     ~BasicRoutingServiceApp()
     {
         // Smart pointers do your thing.
@@ -88,6 +92,7 @@ private:
     bool mDone;
     bool mInitialized;
     bool mRunning;
+    boost::shared_ptr<SimpleWorkQueue> mWorkQueue;
 
     std::string mAppName;
     ServiceLocatorManagementPrx mServiceLocatorManagement;
@@ -261,7 +266,7 @@ void BasicRoutingServiceApp::initialize()
         mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
 
         // Create publish the SessionRouter interface.
-        SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher));
+        SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
         BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
         mSessionRouter = basicSessionPtr;
         mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04c5eda..1f6833a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,6 +19,9 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
 asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
 asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
+asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
 
 asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
 asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6eecc72..79b96b6 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -22,6 +22,7 @@
 #include "ScriptProcessor.h"
 #include "logger.h"
 
+using namespace ::AsteriskSCF::Core::Endpoint::V1;
 using namespace ::AsteriskSCF::Core::Routing::V1;
 using namespace ::AsteriskSCF::System::Logging;
 using namespace ::AsteriskSCF::Core::Routing::V1::Event;
@@ -117,6 +118,130 @@ public:
 };
 
 /**
+ * A collector for lookup() operation AMI replies. 
+ */
+class LookupResultCollector : public IceUtil::Shared
+{
+public:
+    /**
+     * Constructor.
+     * @param cb Ice callback.
+     * @param numVotes The number of times isSupported will be called.
+     */
+    LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback, 
+                          const std::string& destination,
+                          const RoutingEventsPtr& eventPublisher, 
+                          int numVotes) 
+                : mCallback(callback), 
+                  mNumVotes(numVotes),
+                  mEventPublisher(eventPublisher),
+                  mDestination(destination)
+    {
+        assert(mNumVotes >= 0);
+
+        if (mNumVotes == 0)
+        {
+           notifyFailed();
+        }
+    }
+
+    /**
+     * Collect results of AMI lookups from multiple EndpointLocators. 
+     */
+    void collectResult(const EndpointSeq& endpoints)
+    {
+        boost::lock_guard<boost::mutex> guard(mLock);
+
+        if ((endpoints.size() > 0) && mCallback)
+        {
+            mCallback->ice_response(endpoints);
+
+            // clear the mCallback pointer so we only answer once
+            mCallback = 0;
+
+            lg(Debug) << "EndpointRegistry::lookup() found Endpoint for destination " << mDestination;
+          
+            // Post event
+            mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
+        }
+
+        assert(mNumVotes > 0); // isSupported was called too many times
+
+        if (--mNumVotes == 0 && mCallback)
+        {
+            notifyFailed();
+        }
+    }
+
+    void fail(const Ice::Exception &e)
+    {
+        boost::lock_guard<boost::mutex> guard(mLock);
+
+        if (--mNumVotes == 0 && mCallback)
+        {
+            notifyFailed();
+        }
+    }
+
+    void notifyFailed()
+    {
+        DestinationNotFoundException e;
+        mCallback->ice_exception(e);
+
+        // clear the mCallback pointer so we only answer once
+        mCallback = 0;
+
+        // Post event
+        mEventPublisher->lookupEvent(mDestination, Event::FAILURE);
+
+        lg(Debug) << "EndpointRegistry::lookup() failed to find destination " << mDestination;
+    }
+
+private:
+    boost::mutex mLock;
+    AMD_EndpointLocator_lookupPtr mCallback;
+    int mNumVotes;
+    RoutingEventsPtr mEventPublisher;
+    std::string mDestination;
+};
+typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
+
+/**
+ * Callback with the results for EndpointLocator::lookup.  This
+ * implementation simply forwards the info on to a LookupCollector.
+ *
+ * @see EndpointLocator::lookup
+ * @see LookupCollector
+ */
+class LookupCallback : public IceUtil::Shared
+{
+public:
+    LookupCallback(const LookupResultCollectorPtr& collector) :
+                   mCollector(collector)
+    {
+    }
+
+    void lookupResult(const EndpointSeq& endpoints)
+    {
+        // delegation to thread safe object
+        // no lock needed
+        mCollector->collectResult(endpoints);
+        mCollector = 0;
+    }
+
+    void fail(const Ice::Exception &e)
+    {
+        mCollector->fail(e);
+        mCollector = 0;
+    }
+
+private:
+    LookupResultCollectorPtr mCollector;
+};
+typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
+
+
+/**
  * Constructor.
  */
 EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
@@ -124,6 +249,74 @@ EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const Routi
 {
 }
 
+    
+/**
+ * Returns the endpoints that match the specified destination id.
+ *   @param id String identifier of the the destination.
+ */
+void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& amdcallback, 
+                                    const ::std::string& destination, 
+                                    const ::Ice::Current&)
+{
+    AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+
+    lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
+
+    string modifiedDestination(destination);
+    if (mImpl->mScriptProcessor.get() != 0)
+    {
+        if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
+        {
+            mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
+
+            lg(Error) << "lookup(): denied by confirmLookup() script.";
+            amdcallback->ice_response(endpoints);
+            return;
+        }
+    }
+
+    std::vector<EndpointLocatorPrx> locatorsToTry;
+
+    // Iterate over all registered EndpointLocators and check their regular expressions against the destination.
+    EndpointLocatorMap locatorMap;
+    mImpl->getEndpointLocatorMapCopy(locatorMap);
+    for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
+    {
+        // Test to see if the destination matches any of this entry's regular expressions.
+        for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
+        {
+            if (boost::regex_match(modifiedDestination, *reg))
+            {
+                lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << " at " << entry->first;
+                locatorsToTry.push_back(entry->second.locator);
+            }
+        }
+    }
+
+    // Create a single results collector for the AMI callbacks to reference. 
+    LookupResultCollectorPtr lookupResultCollector(new LookupResultCollector(amdcallback, 
+                                                                             destination,
+                                                                             mImpl->mEventPublisher,
+                                                                             locatorsToTry.size()));
+
+    // Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup. 
+    for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
+    {
+        // Create our typesafe callback 
+        LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
+
+        // Wrap our callback for AMI
+        Callback_EndpointLocator_lookupPtr lookupCallback = 
+                 newCallback_EndpointLocator_lookup(callback, 
+                                                     &LookupCallback::lookupResult,
+                                                     &LookupCallback::fail);
+        // Start AMI invocation
+        lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
+        (*locator)->begin_lookup(destination, lookupCallback);
+    }
+
+}
+
 /**
  * Register an EndpointLocator that can provide endpoints.
  *   @param id A unique identifier for the added EndpointLocator.
@@ -248,63 +441,6 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
 }
 
 /**
- * Returns the endpoints that match the specified destination id.
- *   @param id String identifier of the the destination.
- */
-AsteriskSCF::Core::Endpoint::V1::EndpointSeq EndpointRegistry::lookup(const std::string& destination, const Ice::Current&)
-{
-    AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
-
-    lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
-
-    string modifiedDestination(destination);
-    if (mImpl->mScriptProcessor.get() != 0)
-    {
-        if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
-        {
-            mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
-
-            lg(Error) << "lookup(): denied by confirmLookup() script.";
-            return endpoints;
-        }
-    }
-
-    EndpointLocatorMap locatorMap;
-    mImpl->getEndpointLocatorMapCopy(locatorMap);
-
-    for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
-    {
-        // Test to see if the destination matches any of this entry's regular expressions.
-        for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
-        {
-            if (boost::regex_match(modifiedDestination, *reg))
-            {
-                lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << ". Calling remote lookup()";
-
-                try
-                {
-                    endpoints = entry->second.locator->lookup(modifiedDestination);
-                }
-                catch (const IceUtil::Exception& e)
-                {
-                    lg(Error) << "Exception calling registered EndpointLocator for " << entry->first << " Details: " << e.what();
-                }
-                break;
-            }
-        }
-    }
-
-    Event::OperationResult result(Event::FAILURE);
-    if (endpoints.size() > 0)
-    {
-        result = Event::SUCCESS;
-    }
-    mImpl->mEventPublisher->lookupEvent(destination, result);
-
-    return endpoints;
-}
-
-/**
  * Configure this object with a ScriptProcessor.
  */
 void EndpointRegistry::setScriptProcessor(ScriptProcessor* scriptProcessor)
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index d85a015..775338e 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -73,7 +73,7 @@ public:
      * Returns the endpoints that match the specified destination id.
      *   @param id String identifier of the the destination.
      */
-    AsteriskSCF::Core::Endpoint::V1::EndpointSeq lookup(const std::string& destination, const Ice::Current&);
+    virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
 
 public:
 
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 8aae895..73cbc18 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -16,6 +16,8 @@
 #include <Ice/Ice.h>
 #include <IceStorm/IceStorm.h>
 
+#include <boost/thread/mutex.hpp>
+
 #include "RoutingServiceEventPublisher.h"
 #include "logger.h"
 
@@ -117,6 +119,7 @@ public:
 
 public:
     Event::RoutingEventsPrx mEventTopic;
+    boost::mutex mLock;
 
 private:
     Ice::ObjectAdapterPtr mAdapter;
@@ -137,6 +140,8 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
 void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
@@ -160,6 +165,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
     const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
@@ -181,6 +188,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
@@ -203,6 +212,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
     const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
@@ -223,6 +234,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
  */
 void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
@@ -243,6 +256,8 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
  */
 void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
 {
+    boost::lock_guard<boost::mutex> lock(mImpl->mLock); 
+
     if (!mImpl->isInitialized())
     {
         return;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 26b54ad..b160069 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -13,15 +13,20 @@
  * the GNU General Public License Version 2. See the LICENSE.txt file
  * at the top of the source tree.
  */
+#include <boost/shared_ptr.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/shared_mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
 
 #include "SessionRouter.h"
 #include "EndpointRegistry.h"
 #include "RoutingIf.h"
 #include "EndpointIf.h"
 #include "logger.h"
+#include "WorkQueue.h"
 
+using namespace AsteriskSCF;
 using namespace AsteriskSCF::Core::Routing::V1;
 using namespace AsteriskSCF::Core::Endpoint::V1;
 using namespace AsteriskSCF::System::Logging;
@@ -101,7 +106,8 @@ private:
     size_t mMaxRetries;
     size_t mRetryIntervalMilliseconds;
     size_t mCounter;
-};
+
+}; // class RetryPolicy
 
 /**
  * Listener used to monitor sessions during the routing process. Primarily used to 
@@ -270,7 +276,9 @@ private:
     SessionSeq mSessions;
     bool mTerminated;
     SessionListenerPrx mListenerPrx;
-};
+
+}; // class SessionListenerImpl
+
 typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
 
 /**
@@ -353,7 +361,7 @@ public:
         try
         {
             // Only the adapter holds a smart pointer for this servant, so this will
-            // cause it to be delted.
+            // cause it to be deleted.
             lg(Debug) << "Removing listener from object adapter." ;
             mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
         }
@@ -363,7 +371,12 @@ public:
         }
     }
 
-    SessionListenerImpl* operator->()
+    SessionListenerImpl* operator->() const
+    {
+        return mSessionListener;
+    }
+
+    SessionListenerImpl* getListener() const
     {
         return mSessionListener;
     }
@@ -371,63 +384,138 @@ public:
 private:
     SessionListenerImpl *mSessionListener;
     Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerAllocator
+
+typedef boost::shared_ptr<SessionListenerAllocator> SessionListenerAllocatorPtr;
+/**
+ * Context required by all of the SessionRouter operations. 
+ *  All of the items in the SessionContext are thread-safe. (i.e. no lock required). 
+ */
+struct SessionContext
+{
+public:
+    /**
+     * Constructor. The BridgeManager isn't initialized, but configured via a setter.
+     */
+    SessionContext(const Ice::ObjectAdapterPtr& adapter,
+                            const EndpointRegistryPtr& registry,
+                            const RoutingEventsPtr& publisher,
+                            const boost::shared_ptr<WorkQueue> &mWorkQueue)
+                                  :  adapter(adapter),
+                                     endpointRegistry(registry),
+                                     eventPublisher(publisher),
+                                     workQueue(workQueue)
+    {
+    }
+
+    Ice::ObjectAdapterPtr adapter;
+    EndpointRegistryPtr endpointRegistry;
+    RoutingEventsPtr eventPublisher;
+    AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
+    boost::shared_ptr<WorkQueue> workQueue;
+};
+
+/** 
+ * An interface for an object that manages a collection of SessionRouterOperations. 
+ */
+class OperationsManager
+{
+public:
+    virtual void finished(WorkQueue::Work *) = 0;
+    virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation) = 0; 
+
+protected:
+    OperationsManager() {}
 };
 
 /**
- * Private operations and state of the SessionRouter. 
+ * This is a base class for worker objects that offload SessionRouter operations 
+ * to a worker thead. It implements the WorkQueue::Work
+ * interface so that it can be enqueued to a worker thread or thread pool. 
  */
-class SessionRouterPriv
+template<typename T>
+class SessionRouterOperation : public WorkQueue::Work
 {
 public:
-    SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
-        const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
-        mAdapter(objectAdapter),
-        mEndpointRegistry(endpointRegistry),
-        mEventPublisher(eventPublisher)
+    SessionRouterOperation(const T& amdCallback,
+                           const SessionContext& context,
+                           OperationsManager* manager,
+                           const boost::function<void ()> &initialStateHandler) 
+       : mInitiatorCallback(amdCallback),
+         mSessionContext(context),
+         mFinished(false),
+         mOperationsManager(manager),
+         mCurrentStateHandler(initialStateHandler)
     {
     }
 
-    ~SessionRouterPriv()
+    virtual ~SessionRouterOperation() 
     {
     }
 
-    /**
-     * Set the accessor to the bridge.
-     */
-    void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
-            SessionCommunications::V1::BridgeManagerPrx>&  bridgeAccessor)
+    virtual void doWork()
     {
-        mBridgeManager = bridgeAccessor;
+        mCurrentStateHandler();
+    }
+
+    void finishWithException(const ::std::exception& e)
+    {
+        // Forward to this operation's initiator.
+        mInitiatorCallback->ice_exception(e);
+        finish();
+    }
+
+    void finishWithException()
+    {
+        mInitiatorCallback->ice_exception();
+        finish();
+    }
+
+    void finishAndSendResult()
+    {
+        mInitiatorCallback->ice_response();
+        finish();
+    }
+    
+    void setLookupResult(const EndpointSeq& endpoints)
+    {
+        mLookupResult = endpoints;
+
+        // Reschedule the operation to complete. The operation will have 
+        // advanced to the correct state handler. 
+        mSessionContext.workQueue->enqueue(mOperationsManager->getOngoingOperationSharedPointer(this));
+    }
+
+protected: // These protected operations are utiltity functions. 
+
+    void finish()
+    {
+        mFinished = true;
+        mOperationsManager->finished(this);
     }
 
     /**
-     * Do a lookup of the requested endpoint.
+     * Initiate a lookup of the requested endpoint.
      */
-    EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
+    void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
     {
-        EndpointSeq endpoints;
         try
         {
-            // Lookup the destination.
-            endpoints = mEndpointRegistry->lookup(destination, current);
+            // This component's own lookup interface is implemented as AMD. 
+            // We provide our override of AMD callback. 
+            AMD_EndpointLocator_lookupPtr lookupCallback;
 
-            if (endpoints.empty())
-            {
-                throw DestinationNotFoundException(destination);
-            }
-        }
-        catch (const DestinationNotFoundException&)
-        {
-            // rethrow
-            throw;
+            boost::shared_ptr<WorkQueue::Work> workPtr = mOperationsManager->getOngoingOperationSharedPointer(this);
+            lookupCallback = new LookupCallback<T>(this);
+
+            // Lookup the destination.
+            mSessionContext.endpointRegistry->lookup_async(lookupCallback, destination, current);
         }
-        catch (const Ice::Exception &)
+        catch (...)
         {
-            // Probably couldn't access the EndpointLocator of the registered channel.
-            throw EndpointUnreachableException(destination);
+            finishWithException();
         }
-
-        return endpoints;
     }
 
 
@@ -445,8 +533,7 @@ public:
             catch (const Ice::Exception &e)
             {
                 lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
-                // TBD... probably other bridge cleanup needs to be done.
-                throw;
+                finishWithException(e);
             }
         }
     }
@@ -456,7 +543,7 @@ public:
      */
     BridgePrx getBridge(SessionPrx session)
     {
-        BridgePrx result(0);
+        BridgePrx result;
 
         RetryPolicy policy(5, 500);
         while(policy.canRetry())
@@ -466,23 +553,26 @@ public:
                 result = session->getBridge();
                 break;
             }
-            catch(const Ice::ConnectionLostException&)
+            catch(const Ice::ConnectionLostException& cle)
             {
                 if(!policy.retry())
                 {
                     lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed "  << policy.maxRetries() << " retries." ;
-                    throw;
+                    finishWithException(cle);
+                    throw cle; 
                 }
             }
             catch(const NotBridged& e)
             {
                 lg(Error) << "getBridge(): session is not bridged." ;
-                throw e; // rethrow
+                finishWithException(e); 
+                throw e;
             }
             catch(const Ice::Exception& e)
             {
                 lg(Error) << "getBridge(): Ice exception getting bridge for session:"  << e.what();
-                throw e; // rethrow
+                finishWithException(e); 
+                throw e;
             }
         }
 
@@ -493,7 +583,7 @@ public:
      * Create a session to each of a given set of endpoints, and return a collection of the 
      * newly added sessions. 
      */
-    SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+    SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination)
     {
         // Add a session
         SessionSeq newSessions;
@@ -505,16 +595,16 @@ public:
 
                 // Create a session on the destination.
                 lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
-                SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+                SessionPrx destSession = sessionEndpoint->createSession(destination, mListenerManager->getListener()->getProxy());
                 lg(Debug) << "  Session proxy: " << destSession->ice_toString() ;
 
-                listener->addSession(destSession);
+                mListenerManager->getListener()->addSession(destSession);
                 newSessions.push_back(destSession);
             }
             catch(const Ice::Exception &exception)
             {
                 lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
-                // We may be able to reach SOME of the endpoints.
+                // We may be able to reach SOME of the endpoints. 
             }
         }
         return newSessions;
@@ -523,19 +613,33 @@ public:
     /**
      * Accessor for the sessions in a bridge.
      *   @bridge The bridge whose sessions are to be accessed.
-     *   @except An optional session proxy to be excluded from the list of sessions. 
      */
-    SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
+    SessionSeq getSessionsInBridge(const BridgePrx& bridge)
     {
-        SessionSeq sessions; 
         try
         {
             SessionSeq allSessions = bridge->listSessions();
+        }
+        catch(const Ice::Exception &e)
+        {
+            lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
+            finishWithException(e); 
 
-            if (except == 0)
-            {
-                return allSessions;
-            }
+        }
+        return allSessions;
+    }
+
+    /**
+     * Accessor for the sessions in a bridge.
+     *   @bridge The bridge whose sessions are to be accessed.
+     *   @except Session proxy to be excluded from the list of sessions. 
+     */
+    SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except)
+    {
+        SessionSeq sessions; 
+        try
+        {
+            SessionSeq allSessions = bridge->listSessions();
 
             for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
             {
@@ -548,7 +652,7 @@ public:
         catch(const Ice::Exception &e)
         {
             lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
-            throw e; // rethrow
+            finishWithException(e); 
         }
         return sessions;
     }
@@ -583,165 +687,552 @@ public:
         return removedSessions;
     }
 
-public:
-    Ice::ObjectAdapterPtr mAdapter;
-    EndpointRegistryPtr mEndpointRegistry;
-    RoutingEventsPtr mEventPublisher;
-    AsteriskSCF::SmartProxy::SmartProxy<
-        SessionCommunications::V1::BridgeManagerPrx> mBridgeManager;
-};
+protected:
+    T mInitiatorCallback;
+    SessionContext mSessionContext;
+    WorkQueue::PoolId mPoolId;
 
-SessionRouter::SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
-    const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
-    mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher))
-{
-}
+    bool mFinished;
+    EndpointSeq mLookupResult;
+    SessionListenerAllocatorPtr mListenerManager;
+    OperationsManager* mOperationsManager;
+    boost::function<void ()> mCurrentStateHandler;     // Lightweight state machine. Current state handles doWork() for a given state.
 
-SessionRouter::~SessionRouter()
-{
-    mImpl.reset();
-}
-
-void SessionRouter::setBridgeManager(
-    const AsteriskSCF::SmartProxy::SmartProxy<
-        SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
-{
-    mImpl->mBridgeManager = bridgeAccessor;
-}
+}; // class SessionRouterOperation
 
 /**
- * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
- *   TBD - Need to rework with asynch support.
+ * This is a specialization of the SessionRouterOperation to handle the
+ * routeSession() operation. This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread. 
  */
-void SessionRouter::routeSession(
-    const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
-    const std::string& destination,
-    const Ice::Current& current)
+class  RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
 {
-    lg(Debug) << "routeSession() entered with destination " << destination ;
+public:
+    RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener) 
+         : SessionRouterOperation(cb, 
+                                  context, 
+                                  listener, 
+                                  boost::bind(&RouteSessionOperation::lookupState, this)),
+           mInitiatorCallback(cb),
+           mSource(source),
+           mDestination(destination),
+           mIceCurrent(current)
+    {
+    }
 
-    if (!mImpl->mBridgeManager.initializeOnce())
+    virtual ~RouteSessionOperation() 
     {
-        lg(Error) << "No proxy to BridgeManager.  "
-            "Make sure all services are running.";
-        throw BridgingException(source->getEndpoint()->getId(), destination);
+         lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
     }
 
-    // Create a listener for the source to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    SessionListenerAllocator listener(mImpl->mAdapter, source);
+private:
+    /**
+     * We start routing the session by looking up endpoint of the destination.
+     * This method represents processing in our initial state. 
+     * It is executed off the worker thread. 
+     */
+    void lookupState()
+    {
+        lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+        if (!mSessionContext.bridgeManager.initializeOnce())
+        {
+            lg(Error) << "No proxy to BridgeManager.  "
+                "Make sure all services are running.";
+
+            finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+            return;
+        }
 
-    // Route the destination
-    lg(Debug) << "routeSession(): Routing destination " << destination;
-    EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+        // Create a listener for the source to handle early termination.
+        // The wrapper we're using will remove the listener and free it when
+        // this method is left.
+        SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSource));
+        mListenerManager = listener;
 
-    // Add a session to the endpoints.
-    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+        // Set the state to exectute once we've looked up our endpoints. 
+        mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
 
-    if (listener->getNumSessions() < 2)
+        // Lookup the destination. This will use AMI, and the callback should 
+        // schedule us to execute again. 
+        lg(Debug) << "routeSession(): Routing destination " << mDestination;
+        lookupEndpoints(mDestination, mIceCurrent);
+    }
+
+    /**
+     * Entering this state, the destination endpoint has been obtained. This state
+     * completes the operation by creating the bridge. 
+     * 
+     * This operation is the final state, and is executed off the worker thread. 
+     */
+    void establishBridgeState()
+    {
+        if (mFinished)
+        {
+            return;
+        }
+
+        assert(mEndpoints.size() > 0);
+
+        // Add a session to the endpoints.
+        SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+
+        if (mListenerManager->getListener()->getNumSessions() < 2)
+        {
+            finishWithException(SessionCreationException(mDestination));
+            return;
+        }
+
+        if (mListenerManager->getListener()->isTerminated())
+        {
+            finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the
+        // Bridge's functionality if we keep listening.
+        mListenerManager->getListener()->unregister();
+
+        // Create the bridge
+        BridgePrx bridge;
+        try
+        {
+            SessionSeq bridgedSessions;
+            bridgedSessions.push_back(mSource);
+
+            bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+            bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+            lg(Debug) << "routeSession(): Creating bridge.";
+            bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+        }
+        catch (const Ice::Exception &e)
+        {
+            lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+            finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+            return;
+        }
+
+        // Forward the start to all the destinations routed to.
+        lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+        forwardStart(newSessions);
+
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
+    }
+
+private:
+    // Operation input params. 
+    AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+    SessionPrx mSource;
+    string mDestination;
+    ::Ice::Current mIceCurrent;
+    
+    // Implementation state
+    EndpointSeq mEndpoints;
+
+}; // class RouteSessionOperation
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable 
+ * by the destination param.
+ * 
+ * This is a specialization of the SessionRouterOperation that handles the
+ * connectBridgedSessionsWithDestination() operation. This object is an instance 
+ * of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+ class  ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+    ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener)
+         : SessionRouterOperation(cb, 
+                                  context, 
+                                  listener, 
+                                  boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+           mInitiatorCallback(cb),
+           mSessionToReplace(sessionToReplace),
+           mDestination(destination),
+           mIceCurrent(current)
     {
-        throw SessionCreationException(destination);
     }
 
-    if (listener->isTerminated())
+    virtual ~ConnectBridgedSessionsWithDestinationOperation() 
     {
-        throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
+         lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
     }
 
-    // We're through listening, and we will probably interfere with the
-    // Bridge's functionality if we keep listening.
-    listener->unregister();
+private:
+
+    void lookupState()
+    {
+        lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+        mBridge = mSessionToReplace->getBridge();
+
+        mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+
+        // Create a listener for the sessions not being replaced to handle early termination.
+        // The wrapper we're using will remove the listener and free it when
+        // this method is left.
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+        SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSessionToReplace));
+        mListenerManager = listener;
 
-    // Create the bridge
-    BridgePrx bridge;
-    try
+        // Route the destination
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
+
+        // Set the state to exectute after lookup. 
+        mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+
+        // Lookup the destination. This will use AMI, and the callback should 
+        // schedule us to execute again. 
+        lg(Debug) << "routeSession(): Routing destination " << mDestination;
+        lookupEndpoints(mDestination, mIceCurrent);
+    }
+
+    /**
+     * Entering this state, the destination endpoint has been obtained. 
+     * This operation is invoked from the worker thread, having been scheduled by
+     * the callback from lookup. 
+     */
+    void establishBridgeState()
     {
-        SessionSeq bridgedSessions;
-        bridgedSessions.push_back(source);
+        if (mFinished)
+        {
+            return;
+        }
 
-        bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
-        bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+        // Add a session 
+        SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
 
-        lg(Debug) << "routeSession(): Creating bridge.";
-        bridge = mImpl->mBridgeManager->createBridge(bridgedSessions, 0);
+        if (mListenerManager->getListener()->getNumSessions() < 2)
+        {
+            lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+            finishWithException(SessionCreationException(mDestination));
+            return;
+        }
+
+        if (mListenerManager->getListener()->isTerminated())
+        {
+            lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+            finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the Bridge's functionality if
+        // we keep listening.
+        mListenerManager->getListener()->unregister();
+
+        // Modify the bridge
+        try
+        {
+            lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+            mBridge->replaceSession(mSessionToReplace, newSessions);
+        }
+        catch (const Ice::Exception &e)
+        {
+            lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+            finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+            return;
+        }
+
+        lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+        forwardStart(newSessions);
+
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
+    }
+
+private:
+    // Operation input params. 
+    AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+    SessionPrx mSessionToReplace;
+    string mDestination;
+    ::Ice::Current mIceCurrent;
+
+    // Implementation state
+    EndpointSeq mEndpoints;
+    BridgePrx mBridge;
+    SessionSeq mRemainingSessions;
+
+}; // class ConnectBridgedSessionsWithDestinationOperation
+
+/**
+ * This is a specialization of the SessionRouterOperation that handles the
+ * connectBridgedSessions() operation. 
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency,
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
+ */
+class  ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
+{
+public:
+    ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                          const ::Ice::Current& current,
+                          const SessionContext& context,
+                          OperationsManager* const listener)
+         : SessionRouterOperation(cb, context, listener, boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+           mInitiatorCallback(cb),
+           mSessionToReplace(sessionToReplace),
+           mBridgedSession(bridgedSession),
+           mIceCurrent(current)
+    {
     }
-    catch (const Ice::Exception &e)
+
+    virtual ~ConnectBridgedSessionsOperation() 
     {
-        lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+         lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+    }
+
+private:
+    /**
+     * Replace one session in a Bridge with sessions from another bridge.
+     * No routing is actually performed. This operation exists here for consistency,
+     */
+    void connectBridgedSessionsState()
+    {
+        lg(Debug) << "connectBridgedSessions() entered... ";
+
+        // Get the bridge being merged into.
+        BridgePrx mergeBridge = getBridge(mSessionToReplace);
+
+        SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
+
+        // Create a listener for the sessions not being replaced to handle early termination.
+        // The wrapper we're using will remove the listener and free it when
+        // this method is left.
+        lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+        SessionListenerAllocator listener(mSessionContext.adapter, preserveSessions);
+
+        // Get the bridge for the sessions being moved.
+        BridgePrx oldBridge = getBridge(mBridgedSession);
+
+        SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
+
+        // Check for early termination by the source.
+        if (listener->isTerminated())
+        {
+            lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+            finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+            return;
+        }
+
+        // We're through listening, and we will probably interfere with the Bridge's functionality if
+        // we keep listening.
+        lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
         listener->unregister();
 
-        throw BridgingException(source->getEndpoint()->getId(), destination);
+        // Now replace the sessions.
+        try
+        {
+            lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+            mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+        }
+        catch(const Ice::Exception& e)
+        {
+            lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+            finishWithException(e); // rethrow
+            return;
+        }
+
+        // This operation is complete. Send AMD responses. 
+        finishAndSendResult();
     }
 
+private:
+    // Operation input params. 
+    AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+    SessionPrx mSessionToReplace;
+    SessionPrx mBridgedSession;
+    ::Ice::Current mIceCurrent;
 
-    // Forward the start to all the destinations routed to.
-    lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
-    mImpl->forwardStart(newSessions);
+}; // class ConnectBridgedSessionsOperation
 
-} // SessionRouter::routeSession(...)
 
-/**
- * Replace one session in a Bridge with a new
- * session routable by the destination param.
- *   @param source The session initiating the routing event.
- *   @param destination The address or id of the destination to be routed.
+/** 
+ * An implementation of the AMD_EndpointLocator_lookup callback so 
+ * that we can call our own lookup operation. 
+ * Note that we're not really using AMD, but we're using the same
+ * AMD implementation that other components would use to do a lookup(). 
  */
-void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sessionToReplace,
-    const ::std::string& destination,
-    const Ice::Current& current)
+template <typename T>
+class LookupCallback : public AMD_EndpointLocator_lookup
 {
-    lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
+public:
+    LookupCallback(SessionRouterOperation<T>* operation)
+        : mOperation(operation)
+    {
+    }
+
+public: // Overrides. 
+
+    virtual void ice_exception(const ::std::exception& e)
+    {
+        mOperation->finishWithException(e);
+    }
+
+    virtual void ice_exception()
+    {
+        mOperation->finishWithException();
+    }
 
-    BridgePrx bridge(sessionToReplace->getBridge());
+    virtual void ice_response(const ::AsteriskSCF::Core::Endpoint::V1::EndpointSeq& endpoints)
+    {
+        mOperation->setLookupResult(endpoints);
+    }
 
-    SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
+private:
+    SessionRouterOperation<T>* mOperation;
+};
 
-    // Create a listener for the sessions not being replaced to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
-    SessionListenerAllocator listener(mImpl->mAdapter, remainingSessions);
+typedef map<WorkQueue::Work*, boost::shared_ptr<WorkQueue::Work> > OperationMap;
 
-    // Route the destination
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
-    EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+/**
+ * Private operations and state of the SessionRouter. 
+ */
+class SessionRouterPriv : public OperationsManager
+{
+public:
+    SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, 
+                      const EndpointRegistryPtr& endpointRegistry,
+                      const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                      const boost::shared_ptr<WorkQueue>& workQueue) :
+                         mSessionContext(objectAdapter,
+                                  endpointRegistry,
+                                  eventPublisher,
+                                  workQueue)
+    {
+    }
 
-    // Add a session 
-    SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+    ~SessionRouterPriv()
+    {
+    }
 
-    if (listener->getNumSessions() < 2)
+    /**
+     * Set the accessor to the bridge. 
+     */
+    void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
+            SessionCommunications::V1::BridgeManagerPrx>&  bridgeAccessor)
     {
-        lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << destination << " in connectBridgedSessionsWithDestination(). " ;
-        throw SessionCreationException(destination);
+        mSessionContext.bridgeManager = bridgeAccessor;
     }
 
-    if (listener->isTerminated())
+    void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
     {
-        lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
-        throw SourceTerminatedPreBridgingException(remainingSessions[0]->getEndpoint()->getId());
+        mOngoingOperations[work.get()] = work;
+        mSessionContext.workQueue->enqueue(work);
     }
-    // We're through listening, and we will probably interfere with the Bridge's functionality if
-    // we keep listening.
-    listener->unregister();
 
-    // Modify the bridge
-    try
+public: // Overrides
+
+    /**
+     * Handle a notice from an operation that it has completed. 
+     * Remove our shared_ptr reference so that it will die. 
+     */
+    virtual void finished(WorkQueue::Work* op)
     {
-        lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << destination;
-        bridge->replaceSession(sessionToReplace, newSessions);
+        boost::lock_guard<boost::mutex> guard(mLock);
+        OperationMap::iterator kvp = mOngoingOperations.find(op);
+
+        if (kvp != mOngoingOperations.end())
+        {
+            lg(Debug) << "Removing reference to finished opeation.";
+            mOngoingOperations.erase(kvp);
+        }
     }
-    catch (const Ice::Exception &e)
+
+    /** 
+     * The operations sometimes need a shared_ptr to themselves to hand off to callbacks or
+     * other objects being processed in other threads. 
+     */
+    virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
     {
-        lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
-        throw BridgingException(remainingSessions[0]->getEndpoint()->getId(), destination);
+        boost::lock_guard<boost::mutex> guard(mLock);
+        OperationMap::iterator kvp = mOngoingOperations.find(operation);
+
+        assert(kvp != mOngoingOperations.end());
+        return (*kvp).second;
     }
 
-    lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
-    mImpl->forwardStart(newSessions);
+public:
+    SessionContext mSessionContext;
+    OperationMap mOngoingOperations;
+    boost::mutex mLock;
+};
 
-} // SessionRouter::connectBridgedSessionsWithDestination(...)
+SessionRouter::SessionRouter(
+                  const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
+                  const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                  const boost::shared_ptr<WorkQueue>& workQueue) 
+            : mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher, workQueue))
+{
+}
 
+SessionRouter::~SessionRouter()
+{
+    mImpl.reset();
+}
+
+void SessionRouter::setBridgeManager(
+    const AsteriskSCF::SmartProxy::SmartProxy<
+        SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+{
+    mImpl->mSessionContext.bridgeManager = bridgeAccessor;
+}
+
+/**
+ * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
+ */
+void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                                       const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                       const ::std::string& destination, 
+                                       const ::Ice::Current& current)
+{
+    WorkPtr routeSessionOp(new RouteSessionOperation(cb, 
+                                                   source, 
+                                                   destination, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
+    
+
+    mImpl->scheduleOperation(routeSessionOp);
+}
+
+/**
+ * Replace one session in a Bridge with a new
+ * session routable by the destination param.
+ *   @param source The session initiating the routing event.
+ *   @param destination The address or id of the destination to be routed.
+ */
+void SessionRouter::connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb, 
+                                                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                                const ::std::string& destination, 
+                                                                const ::Ice::Current& current)
+{
+    WorkPtr connectBridgedSessionsWithDestinationOp(new ConnectBridgedSessionsWithDestinationOperation(cb, 
+                                                   sessionToReplace, 
+                                                   destination, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
+
+    mImpl->scheduleOperation(connectBridgedSessionsWithDestinationOp);
+
+}
 
 /**
  * Replace one session in a Bridge with sessions from another bridge.
@@ -756,52 +1247,23 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
  *   their current bridge before being added to the bridge currenltly attached to
  *   sessionToReplace.
  */
-void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
-    const SessionPrx& bridgedSession,
-    const Ice::Current&)
-{
-    lg(Debug) << "connectBridgedSessions() entered... ";
-
-    // Get the bridge being merged into.
-    BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
-
-    SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
-
-    // Create a listener for the sessions not being replaced to handle early termination.
-    // The wrapper we're using will remove the listener and free it when
-    // this method is left.
-    lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
-    SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
 
-    // Get the bridge for the sessions being moved.
-    BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
+void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb, 
+                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,  
+                                                 const ::Ice::Current& current)
+{
+    WorkPtr connectBridgedSessionsOp(new ConnectBridgedSessionsOperation(cb, 
+                                                   sessionToReplace, 
+                                                   bridgedSession, 
+                                                   current, 
+                                                   mImpl->mSessionContext,
+                                                   mImpl.get()));
 
-    SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
+    mImpl->scheduleOperation(connectBridgedSessionsOp);
 
-    // Check for early termination by the source.
-    if (listener->isTerminated())
-    {
-        lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
-        throw SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId());
-    }
-    // We're through listening, and we will probably interfere with the Bridge's functionality if
-    // we keep listening.
-    lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
-    listener->unregister();
-
-    // Now replace the sessions.
-    try
-    {
-        lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
-        mergeBridge->replaceSession(sessionToReplace, migratingSessions);
-    }
-    catch(const Ice::Exception& e)
-    {
-        lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
-        throw e; // rethrow
-    }
+}
 
-} // SessionRouter::connectBridgedSessions(...)
 
 } // end BasicRoutingService
 } // end AsteriskSCF
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index d955519..014f3ce 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -21,6 +21,7 @@
 #include "SmartProxy.h"
 #include "SessionCommunications/SessionCommunicationsIf.h"
 #include "EndpointRegistry.h"
+#include "WorkQueue.h"
 
 namespace AsteriskSCF
 {
@@ -34,8 +35,10 @@ class SessionRouterPriv;
 class SessionRouter : public AsteriskSCF::SessionCommunications::V1::SessionRouter
 {
 public:
-    SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,  const EndpointRegistryPtr& endpointRegistry,
-        const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher);
+    SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,  
+                  const EndpointRegistryPtr& endpointRegistry,
+                  const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+                  const boost::shared_ptr<WorkQueue>& sessionRouterWorkQueue);
     ~SessionRouter();
 
     void setBridgeManager(
@@ -55,6 +58,11 @@ public:
         const std::string& destination,
         const Ice::Current&);
 
+    virtual void routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                                    const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                    const ::std::string& destination, 
+                                    const ::Ice::Current&);
+
     /**
      * Replace a session in a bridge with a destination. The desintation will be routed.
      *   @param sessionToReplace The session to be replaced in a bridge. The affected Bridge interface is
@@ -65,6 +73,11 @@ public:
         const ::std::string& destination,
         const Ice::Current&);
 
+    virtual void connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb, 
+                                                             const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                             const ::std::string& destination, 
+                                                             const ::Ice::Current& );
+
     /**
      * Replace a session in a bridge with another session. If the newSession is already participating in a Bridge,
      * it will be removed from it's current bridge prior to be used as a replacement.
@@ -76,6 +89,11 @@ public:
         const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
         const Ice::Current&);
 
+    virtual void connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb, 
+                                              const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                              const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,  
+                                              const ::Ice::Current&);
+
 private:
     boost::shared_ptr<SessionRouterPriv> mImpl;
 };
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
new file mode 100644
index 0000000..f55ff82
--- /dev/null
+++ b/src/SimpleWorkQueue.cpp
@@ -0,0 +1,273 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+/**
+ * A simple Work Queue implementation. On construction, starts an internal thread. 
+ * Work can be enqueued via the thread-safe enqueue() method. All work must implement 
+ * the Work interface.
+ */
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+#include <list>
+
+#include "logger.h"
+
+#include "SimpleWorkQueue.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::System::Logging;
+using namespace boost;
+
+namespace AsteriskSCF
+{
+class SimpleWorkQueuePriv
+{
+public:
+	SimpleWorkQueuePriv(const std::string& id, const Logger& logger) 
+	     : mLogger(logger),
+		   mQid(id),
+		   mInitialized(false), 
+	       mPaused(false),      // runs by default. 
+		   mFinished(false), 
+		   mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
+		   
+  {
+    mLogger(Debug) << "SimpleWorkQueue::private_impl constructor called. Queue ID:" << mQid;
+  }
+
+  ~SimpleWorkQueuePriv()
+  {
+    mLogger(Debug) << "SimpleWorkQueue::private_impl desctuctor called. Queue ID:" << mQid;
+  }
+
+  WorkPtr dequeue();
+  WorkPtr waitAndDequeue();
+  void execute();
+  bool isPaused();
+
+  const Logger& mLogger;
+  std::string mQid;
+  bool mInitialized;
+  bool mFinished;
+  bool mPaused;
+  std::list<WorkPtr> mQueue;
+  boost::thread mThread;
+  boost::mutex mQueueMutex;
+  boost::condition mEmptyQueueCondition;
+  boost::mutex mPauseMutex;
+  boost::condition mPauseCondition;
+};
+}
+
+SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
+{
+  mImpl->mLogger(Debug) << "SimpleWorkQueue::Constructor() called. Queue ID:" << mImpl->mQid;
+  mImpl->mInitialized = true; 
+}
+
+SimpleWorkQueue::~SimpleWorkQueue()
+{
+    mImpl->mLogger(Debug) << "SimpleWorkQueue::Destructor() called. Queue ID:" << mImpl->mQid;
+    terminate();
+
+	// Wait for worker thread to shut down. 
+	mImpl->mThread.join(); // If you don't do this, then the mImpl is trashed and Execute has bad "this" ptr on other thread. 
+}
+
+bool SimpleWorkQueue::isRunning() 
+{ 
+	return (mImpl->mInitialized && !mImpl->mPaused && !mImpl->mFinished);
+}
+
+/**
+ * Pause the SimpleWorkQueue's thread.
+ */
+void SimpleWorkQueue::pause()
+{
+	mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
+
+   boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+   mImpl->mPaused = true;
+}
+
+/**
+ * Resume from a Paused state. 
+ */
+void SimpleWorkQueue::resume()
+{
+   mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
+
+   boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+   mImpl->mPaused = false;
+   mImpl->mPauseCondition.notify_all();
+}
+
+/** 
+ * Stops this thread from executing. 
+ */
+void SimpleWorkQueue::terminate()
+{
+  mImpl->mLogger(Info) << "SimpleWorkQueue::Terminate called for queue " << mImpl->mQid ;
+
+  mImpl->mFinished = true;
+  mImpl->mPaused = false;
+  mImpl->mPauseCondition.notify_all();      // In case the thread was waiting on the PauseCondition.
+  mImpl->mEmptyQueueCondition.notify_all(); // In case the thread was waiting on an EmptyQueueCondition
+}
+
+/**
+ * A convenience method to determine if there is any pending work on the queue. 
+ */
+bool SimpleWorkQueue::workPending()
+{
+   return !mImpl->mQueue.empty();
+}
+
+/**
+ * Allows other thread to join to this thread. The caller needs to 
+ * call this object's Terminate method, or the join will block
+ * indefinitely.
+ */
+void SimpleWorkQueue::join()
+{
+   mImpl->mThread.join();
+}
+
+static WorkQueue::PoolId mNoOpPoolId;
+
+/**
+ * Enqueue an item of work for processing on this queue's thread. 
+ */
+WorkQueue::PoolId SimpleWorkQueue::enqueue(WorkPtr w)
+{
+   boost::mutex::scoped_lock lock(mImpl->mQueueMutex);
+   bool wasEmpty = mImpl->mQueue.empty();
+   mImpl->mQueue.push_back(w);
+   int size = mImpl->mQueue.size();
+   lock.unlock();
+ 
+   if (wasEmpty)
+   { 
+      mImpl->mEmptyQueueCondition.notify_all();
+   }
+
+   return mNoOpPoolId;
+}
+
+/**
+ * This is a private no-op implementation of a work item. Returned from WaitAndDequeue 
+ * if the program is Terminated while waiting on the EmptyQueueCondition.  
+ */
+class NO_WORK_CLASS : public WorkQueue::Work
+{
+public:
+    NO_WORK_CLASS() {};
+    void doWork() {}   // Do nothing
+};
+static shared_ptr<WorkQueue::Work> NO_WORK_PTR(new NO_WORK_CLASS());
+
+/**
+ * This method returns the next work from the queue. If no work available,
+ * this method waits on the EmptyQueueCondition. 
+ */
+WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
+{
+    boost::mutex::scoped_lock lock(mQueueMutex);
+
+    int size = mQueue.size(); // debugging
+
+    while (mQueue.empty())
+    {
+		mLogger(Debug) << "SimpleWorkQueue::WaitAndDequeue: Waiting on empty queue. Queue ID:" << mQid;
+
+		if (mFinished)
+        {
+		   mLogger(Info) << "SimpleWorkQueue::WaitAndDequeue: Returning the NO_WORK token. Queue ID:" << mQid;
+           return NO_WORK_PTR;
+        }
+
+        mEmptyQueueCondition.wait(lock);
+    }
+    
+	mLogger(Debug) << "SimpleWorkQueue::WaitAndDequeue: Dequeuing some work. Queue ID:" << mQid;
+
+    size = mQueue.size(); // debugging
+
+    shared_ptr<WorkQueue::Work> work = mQueue.front();
+    mQueue.pop_front();
+
+    return work;
+}
+
+bool SimpleWorkQueuePriv::isPaused()
+{
+    boost::mutex::scoped_lock lock(mPauseMutex);
+    return mPaused;
+}
+
+/**
+ * This is the thread's event loop. The thread terminates when this method returns.
+ */
+void SimpleWorkQueuePriv::execute()
+{
+   while (!mInitialized)
+   {
... 247 lines suppressed ...


-- 
asterisk-scf/release/routing.git



More information about the asterisk-scf-commits mailing list