[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Apr 21 18:17:54 CDT 2011


branch "route_replica" has been updated
       via  f5ffe27cceeb1c20efc7c4e662f88573655bbcd0 (commit)
      from  633bee49be17bde2601421db442dcc95ab141fdb (commit)

Summary of changes:
 config/routingtest-integ.config.in     |   11 +-
 src/BasicRoutingServiceApp.cpp         |  251 +++++++++++++++++++------------
 src/BasicRoutingStateReplicatorApp.cpp |  224 +++++++++++++++++++++--------
 src/EndpointRegistry.cpp               |   50 +++----
 src/OperationReplicaCache.cpp          |  194 ++-----------------------
 src/OperationReplicaCache.h            |  119 ++++++++++++++--
 src/RoutingStateReplicatorListener.cpp |   16 +-
 src/SessionRouter.cpp                  |    4 +-
 8 files changed, 478 insertions(+), 391 deletions(-)


- Log -----------------------------------------------------------------
commit f5ffe27cceeb1c20efc7c4e662f88573655bbcd0
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Apr 21 18:17:17 2011 -0500

    Incorporating review feedback.

diff --git a/config/routingtest-integ.config.in b/config/routingtest-integ.config.in
index 1c64594..5945cf2 100644
--- a/config/routingtest-integ.config.in
+++ b/config/routingtest-integ.config.in
@@ -1,5 +1,7 @@
 # This is a configuration file a single process test of the Routing Service.
 
+Ice.Warn.UnknownProperties=0
+
 #Ice.Admin.Endpoints=tcp -p 10006
 #Ice.Admin.InstanceName=IceBox
 IceBox.InstanceName=IceBox
@@ -36,13 +38,15 @@ BridgeManager.ServiceLocatorId=BridgeService
 # Routing Service properties
 
 RoutingService.Endpoints=tcp -p 10050
+RoutingService.ComponentService.Endpoints=tcp -p 10051
 RoutingService.ThreadPool.Size=4
 RoutingService.ThreadPool.SizeMax=10
 RoutingService.ThreadPool.SizeWarn=9
 RoutingService.Standby=no
 RoutingService.StateReplicatorName=Replicator
 
-RoutingService2.Endpoints=tcp -p 10051
+RoutingService2.Endpoints=tcp -p 10052
+RoutingService2.ComponentService.Endpoints=tcp -p 10053
 RoutingService2.ThreadPool.Size=4
 RoutingService2.ThreadPool.SizeMax=10
 RoutingService2.ThreadPool.SizeWarn=9
@@ -50,8 +54,9 @@ RoutingService2.Standby=yes
 RoutingService2.StateReplicatorName=Replicator
 
 Replicator.InstanceName=Replicator
-Replicator.RoutingReplicator.Endpoints=default -p 10052
-Replicator.RoutingReplicator.ThreadPool.Size=4
+Replicator.Endpoints=default -p 10054
+Replicator.ComponentService.Endpoints=default -p 10055
+Replicator.ThreadPool.Size=4
 
 # AsteriskSCF.RoutingService.logger=Debug
 
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index f2a762f..36341fc 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -61,6 +61,12 @@ namespace AsteriskSCF
 namespace BasicRoutingService
 {
 
+class ComponentServiceImpl;
+typedef ::IceUtil::Handle<ComponentServiceImpl> ComponentServiceImplPtr;
+
+class ReplicaManagement;
+typedef ::IceUtil::Handle<ReplicaManagement> ReplicaManagementPtr;
+
 class BasicRoutingServiceApp : public IceBox::Service
 {
 public:
@@ -87,27 +93,26 @@ public:
     void onStandby();
     bool isActive();
 
-    /**
-     * Get an impementation-specific ID for createing service discovery guids. 
-     * This is not the same as the app name, which is configurable. 
-     */
-    std::string getImplementationId() {return mImplementationId;}
+    ////// Overrides of IceBox::Service
 
-public:   // Overrides of IceBox::Service
-    virtual void start(const string& name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
+    virtual void start(const string& name, const ::Ice::CommunicatorPtr& ic, const ::Ice::StringSeq& args);
     virtual void stop();
 
 private:
+    void suspendService(bool shuttingDown);
     void initialize();
     void locateBridgeManager();
-    void locateStateReplicator(bool isActive);
-    void registerWithServiceLocator();
-    void deregisterFromServiceLocator();
-    void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category);
+    void locateStateReplicator();
+    void registerWithServiceLocator(bool includeComponentService);
+    void deregisterFromServiceLocator(bool includeComponentService);
 
     void listenToStateReplicator();
     void stopListeningToStateReplicator();
 
+    /**
+     * Get an impementation-specific ID for createing service discovery guids. 
+     * This is not the same as the app name, which is configurable. 
+     */
     const std::string mImplementationId; 
     bool mDone;
     bool mInitialized;
@@ -129,7 +134,7 @@ private:
     // Our published interfaces.
     BasicSessionRouterPtr mSessionRouter;
     RoutingServiceAdminPtr mAdminInteface;
-    ComponentServicePtr mComponentService;
+    ComponentServiceImplPtr mComponentService;
     ComponentTestPtr mComponentTest;
     AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> mBridgeManager;
     RoutingServiceEventPublisherPtr mEventPublisher;
@@ -138,7 +143,7 @@ private:
 
     // Replication support
     ReplicationContextPtr mReplicationContext;
-    ReplicaPtr mReplicaManagement;
+    ReplicaManagementPtr mReplicaManagement;
     AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
     RoutingStateReplicatorListenerPtr mReplicatorListener;
 
@@ -146,9 +151,11 @@ private:
     bool mListeningToReplicator;
 
     // Implementation
-    Ice::ObjectAdapterPtr mAdapter;
-    Ice::CommunicatorPtr mCommunicator;
+    ::Ice::ObjectAdapterPtr mAdapter;
+    ::Ice::ObjectAdapterPtr mComponentServiceAdapter;
+    ::Ice::CommunicatorPtr mCommunicator;
 };
+typedef ::IceUtil::Handle<BasicRoutingServiceApp> BasicRoutingServiceAppPtr;
 
 static const string RegistryLocatorObjectId("RoutingServiceLocatorRegistry");
 static const string RoutingAdminObjectId("RoutingAdmin");
@@ -163,29 +170,34 @@ static const string ReplicaServiceId("BasicRoutingServiceReplica");
 class ComponentServiceImpl : public ComponentService
 {
 public:
-    ComponentServiceImpl(BasicRoutingServiceApp &app) :
+    ComponentServiceImpl(BasicRoutingServiceApp* app) :
         mApp(app)
     {
     }
 
 public: // Overrides of the ComponentService interface.
-    void suspend(const Ice::Current&)
+    void suspend(const ::Ice::Current&)
+    {
+        mApp->suspend();
+    }
+
+    void resume(const ::Ice::Current&)
     {
-        mApp.suspend();
+        mApp->resume();
     }
 
-    void resume(const Ice::Current&)
+    void shutdown(const ::Ice::Current&)
     {
-        mApp.resume();
+        mApp->stop();
     }
 
-    void shutdown(const Ice::Current&)
+    void shutdownNotice()
     {
-        mApp.stop();
+        mApp = 0;
     }
 
 private:
-    BasicRoutingServiceApp& mApp;
+    BasicRoutingServiceAppPtr mApp;
 };
 
 /** 
@@ -237,18 +249,18 @@ public:
      *  @param app 
      *  @param adapter The adapter is assumed to have been activated. 
      */
-    ReplicaManagement(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter) : mApp(app), mAdapter(adapter)
+    ReplicaManagement(BasicRoutingServiceApp* app, ::Ice::ObjectAdapterPtr adapter) : mApp(app), mAdapter(adapter)
     { 
     }
 
-    bool isActive(const Ice::Current&)
+    bool isActive(const ::Ice::Current&)
     {
-        return mApp.isActive();
+        return mApp->isActive();
     }
 
-    bool activate(const Ice::Current& = ::Ice::Current())
+    bool activate(const ::Ice::Current&)
     {
-        mApp.activated();
+        mApp->activated();
 
         for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
         {
@@ -258,9 +270,9 @@ public:
         return true;
     }
 
-    void standby(const Ice::Current& = ::Ice::Current())
+    void standby(const ::Ice::Current&)
     {
-        mApp.onStandby();
+        mApp->onStandby();
 
         for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
         {
@@ -268,12 +280,12 @@ public:
         }
     }
 
-    void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const ::Ice::Current&)
     {
         mListeners.push_back(listener);
     }
 
-    void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const ::Ice::Current&)
     {
         mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
     }
@@ -282,14 +294,14 @@ private:
     /**
      * Pointer to the object adapter we exist on.
      */
-    Ice::ObjectAdapterPtr mAdapter;
+    ::Ice::ObjectAdapterPtr mAdapter;
 
     /**
      * Listeners that we need to push state change notifications out to.
      */
     vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
 
-    BasicRoutingServiceApp& mApp;
+    BasicRoutingServiceAppPtr mApp;
 };
 
 bool BasicRoutingServiceApp::isActive()
@@ -351,7 +363,7 @@ void BasicRoutingServiceApp::stopListeningToStateReplicator()
  * Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
  * other components can look up our interfaces.
  */
-void BasicRoutingServiceApp::setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
+void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
 {
     // Add category as a parameter to enable other components look this component up.
     ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams;
@@ -362,56 +374,58 @@ void BasicRoutingServiceApp::setCategory(const Discovery::V1::ServiceManagementP
 /**
  * Register this component's primary public interfaces with the Service Locator.
  * This enables other Asterisk SCF components to locate the interfaces we are publishing.
+ *
+ * @param includeComponentService If true, registers our ComponentService interface
+ * in addition to all our other interfaces. While it is not uncommon to need to 
+ * re-register most of our interfaces due to a pause()/resume(), we only deregister
+ * the ComponentService during shutdown. So this variable would typically be true only 
+ * during initial startup. 
  */
-void BasicRoutingServiceApp::registerWithServiceLocator()
+void BasicRoutingServiceApp::registerWithServiceLocator(bool includeComponentService)
 {
-    // Get a proxy to the management interface for the Service Locator, so we can add ourselves into the system discovery mechanisms.
-    mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
-
-    if (mServiceLocatorManagement == 0)
-    {
-        throw IceBox::FailureException(__FILE__, __LINE__, "Configuration error: Unable to obtain proxy for property `LocatorServiceManagement.Proxy'");
-    }
-
     try
     {
         // Register our RoutingAdmin interface with the Service Locator.
-        Ice::ObjectPrx adminObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RoutingAdminObjectId));
+        ::Ice::ObjectPrx adminObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RoutingAdminObjectId));
         RoutingServiceAdminPrx adminPrx = RoutingServiceAdminPrx::checkedCast(adminObjectPrx);
-        string adminServiceGuid(getImplementationId() + "." + Routing::V1::RoutingServiceAdminDiscoveryCategory); // Should be unique for reporting.
+        string adminServiceGuid(mImplementationId + "." + Routing::V1::RoutingServiceAdminDiscoveryCategory); // Should be unique for reporting.
         mAdminManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(adminPrx, adminServiceGuid));
 
         setCategory(mAdminManagement, Routing::V1::RoutingServiceAdminDiscoveryCategory);
 
         // Register our RegistryLocator interface with the Service Locator.
-        Ice::ObjectPrx locatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RegistryLocatorObjectId));
+        ::Ice::ObjectPrx locatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RegistryLocatorObjectId));
         LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::checkedCast(locatorObjectPrx);
-        string locatorServiceGuid(getImplementationId() + "." + Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory);  // Should be unique for reporting.
+        string locatorServiceGuid(mImplementationId + "." + Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory);  // Should be unique for reporting.
         mRegistryLocatorManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(locatorRegistryPrx, locatorServiceGuid));
 
         setCategory(mRegistryLocatorManagement, Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory);
 
-        // Register the ComponentService interface with the Service Locator.
-        Ice::ObjectPrx componentServiceObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
-        ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
-        string componentServiceGuid(getImplementationId() + "." + Routing::V1::ComponentServiceDiscoveryCategory);   // Should be unique for reporting.
-        mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
-
-        setCategory(mComponentServiceManagement, Routing::V1::ComponentServiceDiscoveryCategory);
-
-        if (mPublishTestInterface)
-        {
-            // Register our test servant as a facet of the ComponentService interface.
-            mAdapter->addFacet(mComponentTest, componentServicePrx->ice_getIdentity(), AsteriskSCF::System::Component::V1::ComponentTestFacet);
-        }
-
         // Register the SessionRouter interface with the Service Locator.
-        Ice::ObjectPrx sessionRouterObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(SessionRouterObjectId));
+        ::Ice::ObjectPrx sessionRouterObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(SessionRouterObjectId));
         AsteriskSCF::SessionCommunications::V1::SessionRouterPrx sessionRouterPrx = AsteriskSCF::SessionCommunications::V1::SessionRouterPrx::checkedCast(sessionRouterObjectPrx);
-        string sessionRouterGuid(getImplementationId() + "." + Routing::V1::SessionRouterDiscoveryCategory);   // Should be unique
+        string sessionRouterGuid(mImplementationId + "." + Routing::V1::SessionRouterDiscoveryCategory);   // Should be unique
         mSessionRouterManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(sessionRouterPrx, sessionRouterGuid));
 
         setCategory(mSessionRouterManagement, Routing::V1::SessionRouterDiscoveryCategory);
+
+        if (includeComponentService)
+        {
+            // Register the ComponentService interface with the Service Locator.
+            // Note that this interface has it's own adapter.
+            ::Ice::ObjectPrx componentServiceObjectPrx = mComponentServiceAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
+            ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
+            string componentServiceGuid(mImplementationId + "." + Routing::V1::ComponentServiceDiscoveryCategory);   // Should be unique for reporting.
+            mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
+
+            setCategory(mComponentServiceManagement, Routing::V1::ComponentServiceDiscoveryCategory);
+
+            if (mPublishTestInterface)
+            {
+                // Register our test servant as a facet of the ComponentService interface.
+                mComponentServiceAdapter->addFacet(mComponentTest, componentServicePrx->ice_getIdentity(), AsteriskSCF::System::Component::V1::ComponentTestFacet);
+            }
+        }
     }
     catch(const std::exception& e)
     {
@@ -424,15 +438,23 @@ void BasicRoutingServiceApp::registerWithServiceLocator()
  * Deregister this component's primary public interfaces from the Service Locator.
  * This is done at shutdown, and whenever we want to keep other services from locating
  * our interfaces.
+ *
+ * @param includeComponentService If true, deregisters our ComponentService interface
+ * in addition to all our other interfaces, making this component unreachable from the 
+ * rest of Asterisk SCF. Should only be done during shutdown.
  */
-void BasicRoutingServiceApp::deregisterFromServiceLocator()
+void BasicRoutingServiceApp::deregisterFromServiceLocator(bool includeComponentService)
 {
     try
     {
         mRegistryLocatorManagement->unregister();
         mAdminManagement->unregister();
-        mComponentServiceManagement->unregister();
         mSessionRouterManagement->unregister();
+
+        if (includeComponentService)
+        {
+            mComponentServiceManagement->unregister();
+        }
     }
     catch(...)
     {
@@ -461,7 +483,7 @@ void BasicRoutingServiceApp::locateBridgeManager()
 /**
  * Locate our State Replicator using the Service Locator.
  */
-void BasicRoutingServiceApp::locateStateReplicator(bool isActive)
+void BasicRoutingServiceApp::locateStateReplicator()
 {
     BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
     replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
@@ -480,9 +502,14 @@ void BasicRoutingServiceApp::initialize()
 {
     try
     {
-        // Create the adapter.
+        // Create the primary adapter.
         mAdapter = mCommunicator->createObjectAdapter(mAppName);
 
+        // Create a separate adapter just for the component service. This is 
+        // to make it easy to deactivate all of our interfaces other than the
+        // component service for suspend()/resume().  
+        mComponentServiceAdapter = mCommunicator->createObjectAdapter(mAppName + ".ComponentService");
+
         bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".Standby", "no") == "yes");
 
         // Create the replication context.
@@ -528,13 +555,13 @@ void BasicRoutingServiceApp::initialize()
         mAdminInteface = new RoutingAdmin(mEndpointRegistry);
         mAdapter->add(mAdminInteface, mCommunicator->stringToIdentity(RoutingAdminObjectId));
 
-        // Create and publish the ComponentService interface.
-        mComponentService = new ComponentServiceImpl(*this);
-        mAdapter->add(mComponentService, mCommunicator->stringToIdentity(ComponentServiceId));
+        // Create and publish the ComponentService interface on it's own dedicated adapter. 
+        mComponentService = new ComponentServiceImpl(this);
+        mComponentServiceAdapter->add(mComponentService, mCommunicator->stringToIdentity(ComponentServiceId));
 
         // Create and publish our Replica interface support. This interface allows this component
         // to be activated or placed in standby mode. 
-        mReplicaManagement = new ReplicaManagement(*this, mAdapter);
+        mReplicaManagement = new ReplicaManagement(this, mAdapter);
         mAdapter->add(mReplicaManagement, mCommunicator->stringToIdentity(ReplicaServiceId));
 
         // Create and publish our state replicator listener interface.
@@ -551,13 +578,19 @@ void BasicRoutingServiceApp::initialize()
         }
 
         mAdapter->activate();
+        mComponentServiceAdapter->activate();
+
+        // Get a proxy to the management interface for the Service Locator manager.
+        // This isso we can add ourselves into the system discovery mechanisms.
+        mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
 
         // Get a proxy to the interface for the Service Locator.
+        // This is so we can find the other components we depend on.
         mServiceLocator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
 
         mInitialized = true;
     }
-    catch(const Ice::Exception &e)
+    catch(const ::Ice::Exception &e)
     {
         lg(Error) << "Problems in " << mAppName << BOOST_CURRENT_FUNCTION << e.what();
         throw e;
@@ -565,13 +598,13 @@ void BasicRoutingServiceApp::initialize()
 
     locateBridgeManager();
 
-    locateStateReplicator(mReplicationContext->isComponentActive());
+    locateStateReplicator();
 }
 
 /**
  * Implementation of the required IceBox::Service start method.
  */
-void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPtr& communicator, const Ice::StringSeq& args)
+void BasicRoutingServiceApp::start(const string& name, const ::Ice::CommunicatorPtr& communicator, const ::Ice::StringSeq& args)
 {
     lg(Info) << "Starting...";
 
@@ -585,15 +618,14 @@ void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPt
     else
     {
         mAdapter->activate();
+        mComponentServiceAdapter->activate();
     }
 
-    // Plug back into the Asterisk SCF discovery system so that the interfaces we provide
+    // Plug into the Asterisk SCF discovery system so that the interfaces we provide
     // can be located.
-    registerWithServiceLocator();
+    registerWithServiceLocator(true);
     
     // Register with the state replicator in case we are in standby mode. 
-    // This is done here because during initialize(), the reference to the State Replicator isn't
-    // yet available when Donatoe servant is created. 
     listenToStateReplicator();
 
     mRunning = true;
@@ -607,19 +639,51 @@ void BasicRoutingServiceApp::resume()
 {
     if (!mRunning)
     {
-        mAdapter->activate();
+        // If we're in standby by mode, listen for state replication.
+       listenToStateReplicator();
 
         // Plug back into the Asterisk SCF discovery system so that the interfaces we provide
         // can be located.
-        registerWithServiceLocator();
+        registerWithServiceLocator(false);
+
+        // Reactivate the primary adapter. 
+        mAdapter->activate();
     }
 
     mRunning = true;
 }
 
+/** 
+ * Utility function to suspend the service for a suspend() or stop().
+ */
+void BasicRoutingServiceApp::suspendService(bool shuttingDown)
+{
+    if (mRunning)
+    {
+        // Deregister our servants.
+        deregisterFromServiceLocator(shuttingDown);
+
+        // Remove our interfaces from the state replicator.
+        stopListeningToStateReplicator();
+
+        // Deactive the primary adapter. 
+        // The adapter that services the ComponentService stays active. 
+        mAdapter->deactivate();
+    }
+
+    mRunning = false;
+}
+
+/** 
+ * Handle a notice from the ComponentService. 
+ */
 void BasicRoutingServiceApp::suspend()
 {
-    stop();
+    lg(Info) << "Suspending...";
+
+    suspendService(false);
+
+    lg(Info) << "Suspended.";
 }
 
 /**
@@ -629,32 +693,25 @@ void BasicRoutingServiceApp::stop()
 {
     lg(Info) << "Stopping...";
 
-    if (mRunning)
-    {
-        deregisterFromServiceLocator();
-
-        // Remove our interfaces from the state replicator.
-        stopListeningToStateReplicator();
+    suspendService(true);
 
-        mAdapter->deactivate();
-    }
+    // Just in case we were in suspend() mode when told to stop.
+    mComponentServiceManagement->unregister();
 
-    //
-    // TODO: Needs to destroy communicator if it is not shared.
-    //
+    // Turn off our ComponentService interface. 
+    // Only a start() directly from IceBox can restart us now. 
+    mComponentServiceAdapter->deactivate();
 
-    mRunning = false;
     lg(Info) << "Stopped.";
 }
 
-
 } // end BasicRoutingService
 } // end AsteriskSCF
 
 
 extern "C"
 {
-ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(Ice::CommunicatorPtr communicator)
+ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(::Ice::CommunicatorPtr communicator)
 {
     return new AsteriskSCF::BasicRoutingService::BasicRoutingServiceApp;
 }
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 5d644d8..dff11e3 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -60,30 +60,35 @@ typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
 class BasicRoutingStateReplicatorService : public IceBox::Service
 {
 public:
-    BasicRoutingStateReplicatorService() { };
-    ~BasicRoutingStateReplicatorService()
-    {
-        mComponentService = 0;
-        mAdapter = 0;
-        mStateReplicator = 0;
-    };
+    BasicRoutingStateReplicatorService() : mRunning(false), mInitialized(false) {};
+    ~BasicRoutingStateReplicatorService() {};
+
+    void suspend();
+    void resume();
+
+    ////// Overrides of IceBox::Service methods. 
     virtual void start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
     virtual void stop();
 
 private:
-    void initialize(std::string appName, const Ice::CommunicatorPtr& ic);
-    void registerWithServiceLocator(const Ice::CommunicatorPtr& ic);
-    void deregisterFromServiceLocator();
+    void suspendService(bool shuttingDown);
+    void initialize(const std::string& appName);
+    void registerWithServiceLocator();
+    void deregisterFromServiceLocator(bool includeComponentService);
     std::string mAppName;
-    //vector<BasicRoutingStateReplicatorListenerPrx> mListeners;
-    Ice::ObjectAdapterPtr mAdapter;
+
+    bool mRunning;
+    bool mInitialized;
+    ::Ice::ObjectAdapterPtr mAdapter;
+    ::Ice::ObjectAdapterPtr mComponentServiceAdapter;
+    ::Ice::CommunicatorPtr mCommunicator;
     ServiceLocatorManagementPrx mServiceLocatorManagement;
     Discovery::V1::ServiceManagementPrx mComponentServiceManagement;
     Discovery::V1::ServiceManagementPrx mStateReplicationManagement;
     ConfiguredIceLoggerPtr mIceLogger;
-    ComponentServicePtr mComponentService;
     AsteriskSCF::BasicRoutingService::RoutingStateReplicatorIPtr mStateReplicator;
 };
+typedef ::IceUtil::Handle<BasicRoutingStateReplicatorService> BasicRoutingStateReplicatorServicePtr;
 
 static const string ComponentServiceId("BasicRoutingStateReplicatorComponent");
 static const string ServiceDiscoveryId("BasicRoutingStateReplicatorService");
@@ -95,26 +100,26 @@ static const string ServiceDiscoveryId("BasicRoutingStateReplicatorService");
 class ComponentServiceImpl : public ComponentService
 {
 public:
-    ComponentServiceImpl(BasicRoutingStateReplicatorService &service) : mService(service) {}
+    ComponentServiceImpl(BasicRoutingStateReplicatorService* service) : mService(service) {}
 
 public: // Overrides of the ComponentService interface.
-    virtual void suspend(const ::Ice::Current& = ::Ice::Current())
+    virtual void suspend(const ::Ice::Current&)
     {
-        // TBD... focussed more on the component than the replicator.
+        mService->suspend();
     }
 
-    virtual void resume(const ::Ice::Current& = ::Ice::Current())
+    virtual void resume(const ::Ice::Current&)
     {
-        // TBD...
+        mService->resume();
     }
 
-    virtual void shutdown(const ::Ice::Current& = ::Ice::Current())
+    virtual void shutdown(const ::Ice::Current&)
     {
-        mService.stop();
+        mService->stop();
     }
 
 private:
-    BasicRoutingStateReplicatorService& mService;
+    BasicRoutingStateReplicatorServicePtr mService;
 };
 
 class BasicRoutingStateReplicatorCompare : public ServiceLocatorParamsCompare
@@ -137,37 +142,36 @@ private:
 typedef IceUtil::Handle<BasicRoutingStateReplicatorCompare> BasicRoutingStateReplicatorComparePtr;
 
 /**
+ * Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
+ * other components can look up our interfaces.
+ */
+void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
+{
+    // Add category as a parameter to enable other components look this component up.
+    ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams;
+    genericparams->category = category;
+    serviceManagement->addLocatorParams(genericparams, "");
+}
+
+/**
  * Register this component's primary public interfaces with the Service Locator.
  * This enables other Asterisk SCF components to locate our interfaces.
  */
-void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::CommunicatorPtr& ic)
+void BasicRoutingStateReplicatorService::registerWithServiceLocator()
 {
     try
     {
-        // Get a proxy to the management interface for the Service Locator, so we can add ourselves into the system discovery mechanisms.
-        mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(ic->propertyToProxy("LocatorServiceManagement.Proxy"));
-
-        if (mServiceLocatorManagement == 0)
-        {
-            lg(Error) << "Unable to obtain proxy to ServiceLocatorManagement interface. Check config file. This component can't be found until this is corrected." << endl;
-            return;
-        }
-
         // Get a proxy to our ComponentService interface and add it to the Service Locator.
-        Ice::ObjectPrx componentServiceObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ComponentServiceId));
+        // Note that this interface has its own adapter.
+        Ice::ObjectPrx componentServiceObjectPrx = mComponentServiceAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
         ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
 
-        // The GUID passed in to add service needs to be unique for reporting.
         string componentServiceGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory);
         mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
+        setCategory(mComponentServiceManagement,  AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory);
 
-        // Add category as a parameter to enable other components look this component up.
-        ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
-        genericparams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory;
-
-        mComponentServiceManagement->addLocatorParams(genericparams, "");
-
-        Ice::ObjectPrx stateReplicatorObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ServiceDiscoveryId));
+        // Get a proxy to our Replicator interface and add it to the Service Locator.
+        Ice::ObjectPrx stateReplicatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(ServiceDiscoveryId));
         RoutingStateReplicatorPrx stateReplicatorPrx = RoutingStateReplicatorPrx::checkedCast(stateReplicatorObjectPrx);
 
         string stateReplicationGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory);
@@ -176,18 +180,18 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
         ServiceLocatorParamsPtr discoveryParams = new ServiceLocatorParams();
         discoveryParams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
 
-        string replicatorName = ic->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
-        BasicRoutingStateReplicatorCompare* nameCompare = new BasicRoutingStateReplicatorCompare(replicatorName);
-        ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(nameCompare));
+        string replicatorName = mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
+        ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+                                                    mAdapter->addWithUUID(new BasicRoutingStateReplicatorCompare(replicatorName)));
 
         string compareGuid = IceUtil::generateUUID();
         mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
         mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
-
     }
     catch(...)
     {
-        lg(Error) << "Exception in " << mAppName << " registerWithServiceLocator()" << endl;
+        lg(Error) << "Exception in " << mAppName << BOOST_CURRENT_FUNCTION << endl;
+        throw;
     }
 }
 
@@ -195,52 +199,152 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
  * Deregister this component's primary public interfaces from the Service Locator.
  * This is done at shutdown, and whenever we want to keep other services from locating
  * our interfaces.
+ *
+ * @param includeComponentService If true, deregisters our ComponentService interface
+ * in addition to all our other interfaces, making this component unreachable from the 
+ * rest of Asterisk SCF. Should only be done during shutdown.
  */
-void BasicRoutingStateReplicatorService::deregisterFromServiceLocator()
+void BasicRoutingStateReplicatorService::deregisterFromServiceLocator(bool includeComponentService)
 {
     try
     {
-        mComponentServiceManagement->unregister();
+        mStateReplicationManagement->unregister();
+
+        if (includeComponentService)
+        {
+            mComponentServiceManagement->unregister();
+        }
     }
     catch(...)
     {
-        lg(Error) << "Exception in deregisterFromServiceLocator()." << endl;
+        lg(Error) << "Exception in " << BOOST_CURRENT_FUNCTION << endl;
     }
 }
 
-void BasicRoutingStateReplicatorService::initialize(const std::string appName, const Ice::CommunicatorPtr& ic)
+void BasicRoutingStateReplicatorService::initialize(const std::string& appName)
 {
-    mAdapter = ic->createObjectAdapter(appName + ".RoutingReplicator");
+    mAppName = appName;
+
+    // This is the primary adapter for this component.
+    mAdapter = mCommunicator->createObjectAdapter(mAppName);
+
+    // Create a separate adapter just for the component service. This is 
+    // to make it easy to deactivate all of our interfaces other than the
+    // component service to support suspend()/resume().
+    mComponentServiceAdapter = mCommunicator->createObjectAdapter(mAppName + ".ComponentService");
 
     // setup logging client
     mIceLogger = createIceLogger(mAdapter);
     getLoggerFactory().setLogOutput(mIceLogger->getLogger());
 
-    mAppName = appName;
-
-    // Create and publish our ComponentService interface support.
-    mComponentService = new ComponentServiceImpl(*this);
-    mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
+    // Create and publish our ComponentService interface support on its own adapter.
+    mComponentServiceAdapter->add(new ComponentServiceImpl(this), mCommunicator->stringToIdentity(ComponentServiceId));
 
     // Create our instance of the StateReplicator template. 
     mStateReplicator = new RoutingStateReplicatorI();
-    mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
+    mAdapter->add(mStateReplicator, mCommunicator->stringToIdentity(ServiceDiscoveryId));
 
+    // Activate our adapters.
     mAdapter->activate();
+    mComponentServiceAdapter->activate();
+
+    // Get a proxy to the management interface for the Service Locator, so we can add 
+    // ourselves into the system discovery mechanisms.
+    mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
+
+    if (mServiceLocatorManagement == 0)
+    {
+        lg(Error) << "Unable to obtain proxy to ServiceLocatorManagement interface. Check config file. " << BOOST_CURRENT_FUNCTION << endl;
+    }
+
+    mInitialized = true;
 }
 
 void BasicRoutingStateReplicatorService::start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args)
 {
-    initialize(name, ic);
+    mCommunicator = ic;
+
+    if (!mInitialized)
+    {
+        initialize(name);
+    }
+    else
+    {
+        mAdapter->activate();
+        mComponentServiceAdapter->activate();
+    }
+
     // Plug into the Asterisk SCF discovery system so that the interfaces we provide
     // can be located.
-    registerWithServiceLocator(ic);
+    registerWithServiceLocator();
+
+    mRunning = true;
+}
+
+void BasicRoutingStateReplicatorService::resume()
+{
+    if (!mRunning)
+    {
+        mAdapter->add(mStateReplicator, mCommunicator->stringToIdentity(ServiceDiscoveryId));
+
+        // Plug back into the Asterisk SCF discovery system so that the interfaces we provide
+        // can be located.
+        registerWithServiceLocator();
+
+        // Reactivate the primary adapter. 
+        mAdapter->activate();
+    }
+
+    mRunning = true;
+}
+
+/** 
+ * Utility function to suspend the service for a suspend() or stop().
+ */
+void BasicRoutingStateReplicatorService::suspendService(bool shuttingDown)
+{
+    if (mRunning)
+    {
+        // Deregister our servants.
+        deregisterFromServiceLocator(shuttingDown);
+
+        // Deactive the primary adapter. 
+        // The adapter that services the ComponentService stays active. 
+        mAdapter->deactivate();
+    }
+
+    mRunning = false;
+}
+
+/** 
+ * Handle a notice from the ComponentService. 
+ */
+void BasicRoutingStateReplicatorService::suspend()
+{
+    lg(Info) << "Suspending...";
+
+    suspendService(false);
+
+    lg(Info) << "Suspended.";
 }
 
+/**
+ * Implementation of the required IceBox::Service stop method.
+ */
 void BasicRoutingStateReplicatorService::stop()
 {
-    // Remove our interfaces from the service locator.
-    deregisterFromServiceLocator();
+    lg(Info) << "Stopping...";
+
+    suspendService(true);
+
+    // Just in case we were in suspend() mode when told to stop.
+    mComponentServiceManagement->unregister();
+
+    // Turn off our ComponentService interface. 
+    // Only a start() directly from IceBox can restart us now. 
+    mComponentServiceAdapter->deactivate();
+
+    lg(Info) << "Stopped.";
 }
 
 extern "C"
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 30b951b..f5e52d9 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -106,7 +106,6 @@ public:
         mEndpointLocatorMap.erase(locatorId);
     }
 
-
     void insertLocatorMapItem(const string& key, const RegisteredLocator& locator)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -239,6 +238,18 @@ public:
         mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
     }
 
+    /**
+     * Utiltity to do thread-safe check for existing map entry. 
+     */
+    bool locatorExists(const std::string& locatorId, EndpointLocatorMapConstIterator& existing)
+    {
+        EndpointLocatorMapConstIterator end;
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+        existing = mEndpointLocatorMap.find(locatorId);
+        return existing != mEndpointLocatorMap.end();
+    }
+
     boost::shared_mutex mLock;
 
     ScriptProcessorPtr mScriptProcessor;
@@ -258,8 +269,11 @@ class LookupResultCollector : public IceUtil::Shared
 public:
     /**
      * Constructor.
-     * @param cb Ice callback.
-     * @param numVotes The number of times isSupported will be called.
+     * @param callback AMD callback for the operation that
+     * initiated this lookup. 
+     * @param destinatin The destination being looked up.
+     * @param eventPublisher 
+     * @param numVotes The number of replies expected.
      */
     LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback, 
                           const std::string& destination,
@@ -285,6 +299,7 @@ public:
 
     /**
      * Collect results of AMI lookups from multiple EndpointLocators. 
+     * We only care about the first reply that gives us endpoints.
      */
     void lookupResult(const EndpointSeq& endpoints)
     {
@@ -303,7 +318,7 @@ public:
             mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
         }
 
-        assert(mNumVotes > 0); // isSupported was called too many times
+        assert(mNumVotes > 0);
 
         if (--mNumVotes == 0 && mCallback)
         {
@@ -418,27 +433,6 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
 }
 
 /**
- * Non-member utiltity to do thread-safe check for existing map entry. 
- */
-bool locatorExists(const EndpointLocatorMap& map, const std::string& locatorId, boost::shared_mutex& mtx, EndpointLocatorMapConstIterator& existing)
-{
-        EndpointLocatorMapConstIterator end;
-
-        {   // critical scope
-            boost::shared_lock<boost::shared_mutex> lock(mtx);
-
-            existing = map.find(locatorId);
-            end = map.end();
-        }
-
-        if (existing == end)
-        {
-            return false;
-        }
-        return true;
-}
-
-/**
  * Register an EndpointLocator that can provide endpoints.
  *   @param id A unique identifier for the added EndpointLocator.
  *   @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
@@ -452,7 +446,7 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
         lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString() << std::endl;
 
         EndpointLocatorMapIterator existing;
-        bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+        bool exists = mImpl->locatorExists( locatorId, existing);
 
         if (exists)
         {
@@ -484,7 +478,7 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
         lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
 
         EndpointLocatorMapIterator existing;
-        bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+        bool exists = mImpl->locatorExists(locatorId, existing);
 
         if (!exists)
         {
@@ -518,7 +512,7 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
     try
     {
         EndpointLocatorMapIterator existing;
-        bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+        bool exists = mImpl->locatorExists(locatorId, existing);
 
         if (!exists)
         {
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index a79d5db..a0c5866 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -28,44 +28,20 @@ namespace AsteriskSCF
 namespace BasicRoutingService
 {
 
-typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
-
-/** 
- * For each transaction id, we're going to cache the all the state items for the operation.
- * The reason for this is that we don't want to rely on the order in which we will receive the state updates. 
- * This class is used to hold all the state updates for a given operation. 
- * 
- * When an item is called for from the cache, we'll apply all the available state updates we have, 
- * in order, up to the highest we have (without missing a state, since each state depends on the results 
- * of the previous.) 
- */
-template<typename T>
-struct OperationReplicaItem
-{
-public:
-    T mOperation;
-    StateItemMapType mItems;
-};
-
-typedef std::map<std::string, OperationReplicaItem<RouteSessionOperationPtr> > RouteSessionMapType;
-typedef std::map<std::string, OperationReplicaItem<ConnectBridgedSessionsWithDestinationOperationPtr> > ConnectBridgeWithDestMapType;
-
 class OperationReplicaCachePriv
 {
 public: 
 
     OperationReplicaCachePriv(const SessionContextPtr& sessionContext) 
-                      : mSessionContext(sessionContext)
+                      : mSessionContext(sessionContext),
+                        mRouteSessionCache(new OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation>(sessionContext)),
+                        mConnectBridgedSessionsWithDestCache(new OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation>(sessionContext))
     {
     }
 
     SessionContextPtr mSessionContext;
-
-    RouteSessionMapType routeSessionReplicas;
-    ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
-
-    boost::shared_mutex mRouteSessionLock;
-    boost::shared_mutex mConnectBridgedWithDestLock;
+    boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > mRouteSessionCache;
+    boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > mConnectBridgedSessionsWithDestCache;
 };
 
 OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext) 
@@ -73,166 +49,20 @@ OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionCon
 {
 }
 
-/**
- * For a specific type of operation, cache the update item.
- */
-template<typename M, typename O, typename OPtr>
-void cacheIt(M& map, const OperationStateItemPtr& item, const SessionContextPtr& context)
-{
-    // See if this transaction is in the cache.
-    M::iterator i = map.find(item->transactionId);
-    if (i == map.end())
-    {
-        // Add an entry to the cache.
-        OperationReplicaItem<OPtr> replica;
-        map[item->transactionId] = replica;
-
-        i = map.find(item->transactionId);
-    }
-    (*i).second.mItems[item->key] = item;
-
-    // If we haven't created the replica yet, do so now. 
-    if ((*i).second.mOperation.get() == 0)
-    {
-        OPtr work(O::createReplica(context));
-        (*i).second.mOperation = work;
-    }
-
-    // Update the replicated object with newest state update. 
-    (*i).second.mOperation->reflectUpdate(item);
-}
-
-void OperationReplicaCache::cacheOperation(OperationType type, const OperationStateItemPtr& item)
-{
-    switch(type)
-    {
-    case ROUTE_SESSION_OP:
-        {
-        boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
-        cacheIt<RouteSessionMapType, 
-                RouteSessionOperation, 
-                RouteSessionOperationPtr>(mPriv->routeSessionReplicas, 
-                                          item,
-                                          mPriv->mSessionContext);
-        }
-            break;
-
-    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
-        {
-        boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
-        cacheIt<ConnectBridgeWithDestMapType, 
-                ConnectBridgedSessionsWithDestinationOperation, 
-                ConnectBridgedSessionsWithDestinationOperationPtr>(mPriv->connectBridgedWithDestReplicas, 
-                                                                    item,
-                                                                    mPriv->mSessionContext);            }
-        break;
-
-    case CONNECT_BRIDGED_SESSIONS_OP:
-        // Not replicating this type. 
-        break;
-    }
-}
-
-/** 
- * For a specific type of operation, fetch a cached entry if one is available.
- */
-template<typename M, typename OPtr>
-bool fetchIt(M& map, const std::string& transactionId, OPtr& ref)
-{
-    M::iterator i = map.find(transactionId);
-    if (i == map.end())
-    {
-        return false;
-    }
-
-    ref = (*i).second.mOperation;
-    map.erase(i);
-
-    ref->fastForwardReplica();
-
-    return true;
-}
-
-bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
-{
-    boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
-    return fetchIt<RouteSessionMapType, 
-                   RouteSessionOperationPtr> (mPriv->routeSessionReplicas, transactionId, ref);
-}
-
-bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
-{
-    boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
-    return fetchIt<ConnectBridgeWithDestMapType, 
-                   ConnectBridgedSessionsWithDestinationOperationPtr> (mPriv->connectBridgedWithDestReplicas, transactionId, ref);
-}
-
-void OperationReplicaCache::dropOperation(OperationType type, std::string transactionId)
+void OperationReplicaCache::clearCache()
 {
-
-    switch(type)
-    {
-    case ROUTE_SESSION_OP:
-        {
-            boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
-            RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
-            if (i != mPriv->routeSessionReplicas.end())
-            {
-                 mPriv->routeSessionReplicas.erase(i);
-            }
-        }
-        break;
-
-    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
-        {
-            boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
-            ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
-            if (i != mPriv->connectBridgedWithDestReplicas.end())
-            {
-                 mPriv->connectBridgedWithDestReplicas.erase(i);
-            }
-        }
-        break;
-    }
+     mPriv->mRouteSessionCache->clear();
+     mPriv->mConnectBridgedSessionsWithDestCache->clear();
 }
 
-void OperationReplicaCache::clearCache(OperationType type)
+boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > OperationReplicaCache::getRouteSessionCache()
 {
-    switch(type)
-    {
-    case ROUTE_SESSION_OP:
-        {
-        boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-        mPriv->routeSessionReplicas.clear();
-        }
-        break;
-
-    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
-        {
-        boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-        mPriv->connectBridgedWithDestReplicas.clear();
-        }
-        break;
-    }
+    return mPriv->mRouteSessionCache;
 }
 
-void OperationReplicaCache::clearCache()
+boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > OperationReplicaCache::getConnectBridgedSessionsWithDestCache()
 {
-    {
-    boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-    mPriv->routeSessionReplicas.clear();
-    }
-
-    {
-    boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-    mPriv->connectBridgedWithDestReplicas.clear();
-    }
+    return mPriv->mConnectBridgedSessionsWithDestCache;
 }
 
 } // end BasicRoutingService
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index 0424881..5638006 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -31,27 +31,124 @@ namespace AsteriskSCF
 namespace BasicRoutingService
 {
 
-enum OperationType
-{
-    ROUTE_SESSION_OP = 0,
-    CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP = 1,
-    CONNECT_BRIDGED_SESSIONS_OP = 2
-};
+template<typename O>
+class OperationCachePriv;
 
 class OperationReplicaCachePriv;
 class SessionContext;
 
+typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
+
+/** 
+ * For each transaction id, we're going to cache the all the state items for the operation.
+ * The reason for this is that we don't want to rely on the order in which we will receive the state updates.
+ * This class is used to hold all the state updates for a given operation. 
+ * 
+ * When an item is called for from the cache, we'll apply all the available state updates we have, 
+ * in order, up to the highest we have (without missing a state, since each state depends on the results 
+ * of the previous.) The operation must have fastForwardReplica(...) operation to support this.
+ */
+template<typename T>
+struct OperationReplicaItem
+{
+public:
+    T mOperation;
+    StateItemMapType mItems;
+};
+
+/**
+ * Template to cache some type of operation. 
+ */
+template<typename O>
+class OperationCache
+{
+public:
+
+    typedef std::map<std::string, OperationReplicaItem<boost::shared_ptr<O>>> OpMapType;
+
+    OperationCache(const SessionContextPtr& sessionContext) 
+                      : mSessionContext(sessionContext)
+    {
+    }
+
+    void cacheOperationState(const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        // See if this transaction is in the cache.
+        OpMapType::iterator i = mReplicas.find(item->transactionId);
+        if (i ==  mReplicas.end())
+        {
+            // Add an entry to the cache.
+            OperationReplicaItem< boost::shared_ptr<O> > replica;
+            mReplicas[item->transactionId] = replica;
+
+            i = mReplicas.find(item->transactionId);
+        }
+
+        // Add this item to the replica's collection of state items.
+        (*i).second.mItems[item->key] = item;
+
+        // If we haven't created the replica yet, do so now. 
+        if ((*i).second.mOperation.get() == 0)
+        {
+            boost::shared_ptr<O> workPtr(O::createReplica(mSessionContext));
+            (*i).second.mOperation = workPtr;
+        }
+
+        // Update the replicated object with newest state update. 
+        (*i).second.mOperation->reflectUpdate(item);
+    }
+
+    bool fetchOperation(std::string transactionId, boost::shared_ptr<O>& outRef)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        OpMapType::iterator i = mReplicas.find(transactionId);
+        if (i == mReplicas.end())
+        {
+            return false;
+        }
+
+        outRef = (*i).second.mOperation;
+        mReplicas.erase(i);
+
+        outRef->fastForwardReplica();
+
+        return true;
+    }
+
+    void dropOperation(std::string transactionId)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+        OpMapType::iterator i = mReplicas.find(transactionId);
+        if (i != mReplicas.end())
+        {
+            mReplicas.erase(i);
+        }
+    }
+
+    void clear()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mReplicas.clear();
+    }
+
+private:
+    SessionContextPtr mSessionContext;
+    OpMapType mReplicas;
+    boost::shared_mutex mLock;
+};
+
 class OperationReplicaCache
 {
 public: 
     OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
 
-    void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
-    bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
-    bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
-    void dropOperation(OperationType type, std::string transactionId);
+    boost::shared_ptr< AsteriskSCF::BasicRoutingService::OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > getRouteSessionCache();
+    boost::shared_ptr< AsteriskSCF::BasicRoutingService::OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > getConnectBridgedSessionsWithDestCache();
 
-    void clearCache(OperationType type);
     void clearCache();
 
 private:
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index c62cee1..f0d9e32 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -66,7 +66,7 @@ public:
             void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
             {
                 // The operation cache keeps all the collected state for an operation under the transaction id. 
-                mImpl->mOperationReplicaCache->dropOperation(ROUTE_SESSION_OP, opState->transactionId);
+                mImpl->mOperationReplicaCache->getRouteSessionCache()->dropOperation(opState->transactionId);
             }
 
             void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
@@ -81,7 +81,7 @@ public:
 
             void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
             {
-                mImpl->mOperationReplicaCache->dropOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState->transactionId);
+                mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->dropOperation(opState->transactionId);
             }
 
             void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
@@ -131,32 +131,32 @@ public:
 
             void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+                mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
             }
 
             void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+                mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
             }
 
             void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+                mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
             }
 
             void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+                mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
             }
 
             void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+                mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
             }
 
             void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr& opState)
             {
-                mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+                mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
             }
 
             void visitEndpointLocatorState(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorStatePtr& item)
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 9e87e48..c5909ab 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -180,7 +180,7 @@ void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunication
     
     // Check the cache for a replica with this transaction Id.
     RouteSessionOperationPtr routeSessionOp;
-    if (mImpl->mOperationReplicaCache->fetchRouteSessionOp(transactionId, routeSessionOp))
+    if (mImpl->mOperationReplicaCache->getRouteSessionCache()->fetchOperation(transactionId, routeSessionOp))
     {
         routeSessionOp->rehostReplica(cb, current, mImpl.get());
         WorkPtr replicaOp(routeSessionOp);
@@ -214,7 +214,7 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const ::Asterisk
 
     // Check the cache for a replica with this transaction Id.
     ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
-    if (mImpl->mOperationReplicaCache->fetchConnectBridgedSessionsWithDestOp(transactionId, connectBridgedSessionsWithDestOp))
+    if (mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->fetchOperation(transactionId, connectBridgedSessionsWithDestOp))
     {
         connectBridgedSessionsWithDestOp->rehostReplica(cb, current, mImpl.get());
         WorkPtr replicaOp(connectBridgedSessionsWithDestOp);

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/routing.git



More information about the asterisk-scf-commits mailing list