[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Apr 12 20:35:59 CDT 2011


branch "route_replica" has been updated
       via  cc3b28f14b514d647fa4933a0a54c6c13705191b (commit)
      from  87dec489be1edc35387c4a774a33b4a59ea68112 (commit)

Summary of changes:
 local-slice/BasicRoutingStateReplicationIf.ice     |  127 ++++++------
 local-slice/CMakeLists.txt                         |    4 +
 src/BasicRoutingServiceApp.cpp                     |   40 +++-
 src/BasicRoutingStateReplicator.h                  |    3 +-
 src/BasicRoutingStateReplicatorApp.cpp             |    4 +-
 src/BasicRoutingStateReplicatorListener.cpp        |   75 ++++++--
 src/CMakeLists.txt                                 |    4 +-
 src/ConnectBridgedSessionsOperation.cpp            |   43 ++++-
 src/ConnectBridgedSessionsOperation.h              |   19 ++-
 ...nectBridgedSessionsWithDestinationOperation.cpp |  216 +++++++++++++++++++-
 ...onnectBridgedSessionsWithDestinationOperation.h |   54 ++++-
 src/EndpointRegistry.cpp                           |  137 ++++++++++++-
 src/OperationReplicaCache.cpp                      |  195 ++++++++++++++++++
 src/OperationReplicaCache.h                        |   62 ++++++
 src/RouteSessionOperation.cpp                      |  188 +++++++++++++-----
 src/RouteSessionOperation.h                        |   65 +++++-
 src/RoutingServiceEventPublisher.cpp               |   48 -----
 src/RoutingServiceEventPublisher.h                 |    2 -
 src/SessionRouter.cpp                              |  106 +++-------
 src/SessionRouter.h                                |   16 +-
 src/SessionRouterOperation.h                       |   55 ++++-
 21 files changed, 1142 insertions(+), 321 deletions(-)
 create mode 100644 local-slice/CMakeLists.txt
 create mode 100644 src/OperationReplicaCache.cpp
 create mode 100644 src/OperationReplicaCache.h


- Log -----------------------------------------------------------------
commit cc3b28f14b514d647fa4933a0a54c6c13705191b
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue Apr 12 20:34:53 2011 -0500

    Big fix for caching the replicated operations.

diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 0a0d740..4b696c1 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -35,19 +35,39 @@ module V1
        string name;
     };
 
-    class RoutingStateItem
+    ///////////////////////////////////////////////////////////////////////
+	// These classes and interfaces implement the replication 
+	// pattern of Asterisk SCF.
+
+    ["visitor"] local class RoutingStateItemVisitor
+    {
+    };
+
+	/**
+	 * Base class for an item that will be replicated.
+	 * The key will be unique among all state items stored in the
+	 * state replicator. 
+	 */
+    ["visitor:RoutingStateItemVisitor"] class RoutingStateItem
     {
 	    string key;
     };
 
     sequence<RoutingStateItem> RoutingStateItemSeq;
 
+	/**
+	 * Listener interface. Typically implemented by
+	 * a routing service in standby mode. 
+	 */
     interface RoutingStateReplicatorListener
     {
         void stateRemoved(Ice::StringSeq itemKeys);
         void stateSet(RoutingStateItemSeq items);
     };
 
+	/**
+	 * The state replicator interface. 
+	 */
     interface RoutingStateReplicator
     {
        void addListener(RoutingStateReplicatorListener *listener);
@@ -58,106 +78,84 @@ module V1
        idempotent RoutingStateItemSeq getAllState();
     };
 
-    ///////////////////////////////////
-    // These state items represent the state transistions of the RouteSession operation.
+	/** 
+	 * All transactional operations will derive from this. 
+	 */
+	class OperationStateItem extends RoutingStateItem
+    {
+        string transactionId; 
+    };
+
+    ///////////////////////////////////////////////////////////////////////
+    // These state items represent the state transistions
+	//  of the RouteSession operation.
 
-	/**
+    /**
 	 * Indicates the RouteSessionOperation started. 
-	 * The key (in the base state item) is the transactionId of this operation + ".START". 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + RouteSessionOpStartKeyMod 
 	 */
-    class RouteSessionOpStart extends RoutingStateItem
+    class RouteSessionOpStart extends OperationStateItem
     {
-        string transactionId; 
         AsteriskSCF::SessionCommunications::V1::Session *source;
         string destination;
     };
-
     const string RouteSessionOpStartKeyMod = ".START";
 
 	/**
 	 * Indicates the RouteSessionOperation is waiting for an AMI endpoint lookup() reply. 
-	 * The key (in the base state item) is the transactionId of this operation + ".WAITLOOKUP". 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + RouteSessionOpWaitLookupKeyMod
 	 */
-    class RouteSessionOpWaitLookupState extends RoutingStateItem
+    class RouteSessionOpWaitLookupState extends OperationStateItem
     {
-        string transactionId; 
     };
-
     const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
 
 	/**
 	 * Indicates the RouteSessionOperation is going to create the bridge. 
-	 * The key (in the base state item) is the transactionId of this operation + ".BRIDGING". 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + RouteSessionOpBridgingKeyMod
 	 */
-    class RouteSessionOpBridgingState extends RoutingStateItem
+    class RouteSessionOpBridgingState extends OperationStateItem
     {
-        string transactionId; 
         AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
     };
-
     const string RouteSessionOpBridgingKeyMod = ".BRIDGING";
 
-
-    /////////////////////////////////////
-    // These state items represent the state transistions of the ConnectBridgedSessionsWithDestination operation.
+    ///////////////////////////////////////////////////////////////////////
+    // These state items represent the state transistions of the 
+	// ConnectBridgedSessionsWithDestination operation.
 
 	/**
 	 * Indicates the ConnectBridgedSessionsWithDestinationOperation started. 
-	 * The key (in the base state item) is the transactionId of this operation. 
+	 * The key (in the base state item) is the transactionId of this 
+	 * operation + ConnectBridgedSessionsWithDestStartKeyMod
 	 */
-    class ConnectBridgedSessionsWithDestinationOpStart
+    class ConnectBridgedSessionsWithDestinationOpStart extends OperationStateItem
     {
-        string transactionId; 
         AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
         string destination;
     };
+	const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
 
-    class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends RoutingStateItem
+    class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends OperationStateItem
     {
-        string transactionId; 
     };
+    const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
 
-    class ConnectBridgedSessionsWithDestinationOpBridgingState extends RoutingStateItem
+    class ConnectBridgedSessionsWithDestinationOpBridgingState extends OperationStateItem
     {
-        string transactionId; 
         AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
     };
+	const string ConnectBridgedSessionsWithDestBridgingKeyMod = ".BRIDGING";
 
-	/**
-	 * Indicates the ConnectBridgedSessionsWithDestinationOperation completed. 
-	 * The key (in the base state item) is the transactionId of this operation. 
-	 */
-    class ConnectBridgedSessionsWithDestinationOpComplete extends RoutingStateItem
-    {
-        string transactionId; 
-    };
+    ///////////////////////////////////////////////////////////////////////
+    // NOTE: There is no value in replicating the ConnectBridgedSessions 
+	// operation. No intermediate results are obtained to cache. 
 
-    /////////////////////////////////////
-    // These state items represent the state transistions of the ConnectBridgedSessions operation.
 
-	/**
-	 * Indicates the ConnectBridgedSessionsOperation started. 
-	 * The key (in the base state item) is the transactionId of this operation. 
-	 */
-    class ConnectBridgedSessionsOpStart extends RoutingStateItem
-    {
-        string transactionId; 
-        AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
-        AsteriskSCF::SessionCommunications::V1::Session *bridgedSession;
- 
-        string destination;
-    };
-
-	/**
-	 * Indicates the ConnectBridgedSessionsOperation completed. 
-	 * The key (in the base state item) is the transactionId of this operation. 
-	 */
-    class ConnectBridgedSessionsOpComplete extends RoutingStateItem
-    {
-        string transactionId; 
-    };
-
-    ///////////////////////////////////
+    ///////////////////////////////////////////////////////////////////////
     // Endpoint locator state items
 
 	/**  
@@ -178,6 +176,17 @@ module V1
     { 
     };
 
+	/**  
+	 * Represents a change of the endpoint locators 
+	 * managed ids. 
+	 * The key (in the base state item) is the locatorId. 
+	 */
+	class EndpointLocatorSetDestIds extends RoutingStateItem
+	{
+	    AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
+	};
+
+
 }; //module V1
 }; //module BasicRouting
 }; //module Asterisk SCF
diff --git a/local-slice/CMakeLists.txt b/local-slice/CMakeLists.txt
new file mode 100644
index 0000000..71102ba
--- /dev/null
+++ b/local-slice/CMakeLists.txt
@@ -0,0 +1,4 @@
+# Compile Basic Routing Service Component's own slice
+
+asterisk_scf_slice_include_directories("${CMAKE_SOURCE_DIR}/slice") 
+asterisk_scf_compile_slice(BasicRoutingStateReplicationIf.ice lib "Basic Routing State Replicator" BasicRoutingService)
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 0babd69..4b9d3c4 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -36,6 +36,7 @@
 #include "EndpointRegistry.h"
 #include "RoutingAdmin.h"
 #include "SessionRouter.h"
+#include "OperationReplicaCache.h"
 
 using namespace std;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -65,6 +66,7 @@ public:
         : mDone(false), 
           mInitialized(false), 
           mRunning(false),
+          mSessionContext(new SessionContext()),
           mWorkQueue( new AsteriskSCF::Threading::SimpleWorkQueue("SessionRouterWorkQueue", lg))
     {
     }
@@ -108,6 +110,8 @@ private:
     Discovery::V1::ServiceManagementPrx mComponentServiceManagement;
     Discovery::V1::ServiceManagementPrx mSessionRouterManagement;
 
+    SessionContextPtr mSessionContext;
+
     // Our published interfaces.
     BasicSessionRouterPtr mSessionRouter;
     RoutingServiceAdminPtr mAdminInteface;
@@ -115,6 +119,7 @@ private:
     ComponentTestPtr mComponentTest;
     AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> mBridgeManager;
     RoutingServiceEventPublisherPtr mEventPublisher;
+    boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
     EndpointRegistryPtr mEndpointRegistry;
 
     // Replication support
@@ -194,7 +199,13 @@ private:
 class ReplicaImpl : public Replica
 {
 public:
-    ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mPaused(false), mActive(active) 
+    /**
+     * Constructor. 
+     *  @param app 
+     *  @param adapter The adapter is assumed to have been activated. 
+     *  @param active 
+     */
+    ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mActive(active) 
     { 
         if (mActive)
         {
@@ -261,7 +272,6 @@ private:
      */
     vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
 
-    bool mPaused;
     bool mActive;
 
     BasicRoutingServiceApp& mApp;
@@ -416,7 +426,8 @@ void BasicRoutingServiceApp::locateBridgeManager()
         mServiceLocator,
         new ServiceLocatorParams(BridgeServiceDiscoveryCategory),
         lg);
-    mSessionRouter->setBridgeManager(mBridgeManager);
+
+    mSessionContext->bridgeManager = mBridgeManager;
 
     if (!mBridgeManager.isInitialized())
     {
@@ -436,12 +447,16 @@ void BasicRoutingServiceApp::locateStateReplicator()
     AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> pw(mServiceLocator, replicatorParams, lg);
     mStateReplicator = pw;
 
-    mSessionRouter->setStateReplicator(mStateReplicator);
+    pushStateReplicator();
 }
 
+/**
+ * Pushes the state replicator to everything in this components that needs to know. 
+ */
 void BasicRoutingServiceApp::pushStateReplicator()
 {
-    mEventPublisher->setStateReplicator(mStateReplicator);
+    mEndpointRegistry->setStateReplicator(mStateReplicator);
+    mSessionContext->stateReplicator = mStateReplicator;
 }
 
 /**
@@ -475,9 +490,20 @@ void BasicRoutingServiceApp::initialize()
         // as a facet of ComponentService.
         mComponentTest = new ComponentTestImpl(*this);
 #endif
+        // Create the session context needed to construct operations.
+        SessionContext *rawSessionContext(new SessionContext(mAdapter, 
+                                                             mEndpointRegistry, 
+                                                             mEventPublisher, 
+                                                             mWorkQueue));
+        SessionContextPtr sessionContextPtr(rawSessionContext); 
+        mSessionContext = sessionContextPtr;
+                
+        // Create the replica cache. 
+        boost::shared_ptr<OperationReplicaCache> ptr(new OperationReplicaCache(mSessionContext));
+        mOperationReplicaCache = ptr;
 
         // Create publish the SessionRouter interface.
-        SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
+        SessionRouter *rawSessionRouter(new SessionRouter(mSessionContext, mOperationReplicaCache));
         BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
         mSessionRouter = basicSessionPtr;
         mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
@@ -496,7 +522,7 @@ void BasicRoutingServiceApp::initialize()
         mAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
 
         // Create and publish our state replicator listener interface.
-        mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry);
+        mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry, mOperationReplicaCache);
         mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
 
         mAdapter->activate();
diff --git a/src/BasicRoutingStateReplicator.h b/src/BasicRoutingStateReplicator.h
index 9b267f9..c603bbc 100644
--- a/src/BasicRoutingStateReplicator.h
+++ b/src/BasicRoutingStateReplicator.h
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/StateReplicator.h>
 #include "BasicRoutingStateReplicationIf.h"
 #include "EndpointRegistry.h"
+#include "OperationReplicaCache.h"
 
 namespace AsteriskSCF
 {
@@ -40,7 +41,7 @@ typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
 class RoutingStateReplicatorListenerI : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
 {
 public:
-    RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry);
+    RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
     ~RoutingStateReplicatorListenerI();
     void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
     void stateSet(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 65f0bc7..4248064 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -168,7 +168,6 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
         mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
         mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
 
-        // TBD... We may have other interfaces to publish to the Service Locator.
     }
     catch(...)
     {
@@ -202,9 +201,12 @@ void BasicRoutingStateReplicatorService::initialize(const std::string appName, c
     getLoggerFactory().setLogOutput(mIceLogger->getLogger());
 
     mAppName = appName;
+
     // Create and publish our ComponentService interface support.
     mComponentService = new ComponentServiceImpl(*this);
     mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
+
+    // Create our instance of the StateReplicator template. 
     mStateReplicator = new RoutingStateReplicatorI();
     mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
 
diff --git a/src/BasicRoutingStateReplicatorListener.cpp b/src/BasicRoutingStateReplicatorListener.cpp
index d8271fa..53def89 100644
--- a/src/BasicRoutingStateReplicatorListener.cpp
+++ b/src/BasicRoutingStateReplicatorListener.cpp
@@ -20,6 +20,7 @@
 #include <boost/shared_ptr.hpp>
 
 #include "BasicRoutingStateReplicator.h"
+#include "OperationReplicaCache.h"
 
 using namespace AsteriskSCF::BasicRoutingService::V1;
 
@@ -50,8 +51,10 @@ private:
 struct RoutingStateReplicatorListenerImpl
 {
 public:
-   RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry)
-        : mId(IceUtil::generateUUID()) 
+   RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+        : mId(IceUtil::generateUUID()),
+          mEndpointRegistry(registry),
+          mOperationReplicaCache(opCache)
    {
    }
 
@@ -61,30 +64,74 @@ public:
 
     void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
     {
-        for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
-        {
-            EndpointLocatorAddPtr locatorAdd;
-            EndpointLocatorRemovePtr locatorRemove;
-            boost::shared_ptr<RoutingStateReplicatorItem> localitem;
+	    class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
+	    {
+	    public:
+	        visitor(RoutingStateReplicatorListenerImpl *impl) : mImpl(impl)
+	        {
+	        }
+
+	    private:
+	        RoutingStateReplicatorListenerImpl *mImpl;
+
+            void visitRoutingStateItem(const ::AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr&)
+            {
+                
+            }
+
+            void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr&)
+            {
+            }
+
+            void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr&)
+            {
+            }
+
+            void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr&)
+            {
+            }
 
-            // Depending on the type of state item we apply it differently
-            if ((locatorAdd = EndpointLocatorAddPtr::dynamicCast((*item))))
+            void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr&)
             {
-                mEndpointRegistry->addEndpointLocator(locatorAdd->key, locatorAdd->regExList, locatorAdd->locator,  current);
             }
-            else if ((locatorRemove = EndpointLocatorRemovePtr::dynamicCast((*item))))
+
+            void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr&)
+            {
+            }
+
+            void visitEndpointLocatorAdd(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorAddPtr& item)
             {
-                mEndpointRegistry->removeEndpointLocator(locatorAdd->key, current);
+                mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, ::Ice::Current());
             }
+
+            void visitEndpointLocatorRemove(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorRemovePtr& item)
+            {
+                mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+            }
+
+            void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
+            {
+                mImpl->mEndpointRegistry->setEndpointLocatorDestinationIds(item->key, item->regExList, ::Ice::Current());
+            }
+
+        }; // end method-local visitor def
+
+        // Create the visitor. Smart pointer will cleanup when this method exits. 
+        AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitorPtr v = new visitor(this);
+
+        for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+        {
+	        (*item)->visit(v);
         }
     }
 
     std::string mId;
     EndpointRegistryPtr mEndpointRegistry;
+    boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
 };
 
-RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry)
-    : mImpl(new RoutingStateReplicatorListenerImpl(registry)) 
+RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+    : mImpl(new RoutingStateReplicatorListenerImpl(registry, opCache)) 
 {
 }
 
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 64b7aec..8024a59 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -36,7 +36,9 @@ asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperat
 asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.cpp)
 asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
 asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
-asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicator.h)
+asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.h)
+asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.cpp)
+asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicator.h)
 asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicatorListener.cpp)
 
 asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 4e45033..9c637d0 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -50,16 +50,18 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
                                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
                                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
                                                                 const ::Ice::Current& current,
-                                                                const SessionContext& context,
-                                                                OperationsManager* const listener)
+                                                                const SessionContextPtr& context,
+                                                                OperationsManager* const listener,
+                                                                std::string transactionId)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, ConnectBridgedSessionsOp::OperationState>(cb, 
                                                                                                                         context, 
                                                                                                                         listener,
-                                                                                                          ConnectBridgedSessionsOp::STATE_CONNECT),
+                                                                                                          ConnectBridgedSessionsOp::STATE_CONNECT,
+                                                                                                          transactionId),
         mInitiatorCallback(cb),
         mSessionToReplace(sessionToReplace),
         mBridgedSession(bridgedSession),
-        mIceCurrent(current)
+        mIceCurrent(current) 
 {
     Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
     if (it == current.ctx.end())
@@ -72,6 +74,37 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
         mStateMachine.addState(ConnectBridgedSessionsOp::STATE_CONNECT, boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this));
 }
 
+/**
+ * Factory method for the operation. 
+ */
+ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                                                                           const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                                           const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                                                                           const ::Ice::Current& current,
+                                                                           const SessionContextPtr& context,
+                                                                           OperationsManager* const listener)
+{
+    // We don't really care about the transaction id on this operation, since we don't replicate this operation.
+    Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
+    std::string transactionId = "unused";
+    if (it == current.ctx.end())
+    {
+         lg(Error) << "ConnectBridgedSessionsOperation() called with no transaction ID set. Ignored."  ;
+         transactionId = (it->second);
+    }
+    
+    ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cb,
+                                                                               sessionToReplace,
+                                                                               bridgedSession,
+                                                                               current,
+                                                                               context,
+                                                                               listener,
+                                                                               transactionId));
+
+    return ptr;
+}
+ 
+
 ConnectBridgedSessionsOperation::~ConnectBridgedSessionsOperation() 
 {
         lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
@@ -110,7 +143,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
 
     // Create a listener for the sessions not being replaced to handle early termination.
     lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
-    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, preserveSessions));
     mListenerManager = listener;
 
     // Get the bridge for the sessions being moved.
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index fc9364f..2484d5a 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -43,6 +43,10 @@ namespace ConnectBridgedSessionsOp
     };
 }
 
+class ConnectBridgedSessionsOperation;
+
+typedef boost::shared_ptr<ConnectBridgedSessionsOperation> ConnectBridgedSessionsOperationPtr;
+
 /**
  * This class represents an operation to replace one session in a Bridge with sessions 
  * from another bridge. No routing is actually performed. This operation exists here for consistency.
@@ -57,15 +61,25 @@ class  ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskS
                                                                        ConnectBridgedSessionsOp::OperationState>
 {
 public:
-    ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+    static ConnectBridgedSessionsOperationPtr create(
+                          const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
                           const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
                           const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
                           const Ice::Current& current,
-                          const SessionContext& context,
+                          const SessionContextPtr& context,
                           OperationsManager* const listener);
 
     virtual ~ConnectBridgedSessionsOperation();
 
+protected:
+    ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+                                    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                    const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession, 
+                                    const Ice::Current& current,
+                                    const SessionContextPtr& context,
+                                    OperationsManager* const listener,
+                                    std::string transactionId);
+
 private:
     /**
      * This is a state handler for one of this operation's states. 
@@ -81,7 +95,6 @@ private:
     ::Ice::Current mIceCurrent;
 
 }; // class ConnectBridgedSessionsOperation
-
 	
 } // end BasicRoutingService
 } // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 0ef9de0..80ca61c 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -18,12 +18,15 @@
 
 #include <AsteriskSCF/logger.h>
 
+#include "BasicRoutingStateReplicationIf.h"
 #include "ConnectBridgedSessionsWithDestinationOperation.h"
 
 using namespace AsteriskSCF;
 using namespace AsteriskSCF::Core::Routing::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::System::Logging;
+using namespace ::AsteriskSCF::BasicRoutingService::V1;
+using namespace AsteriskSCF::StateMachine;
 
 namespace
 {
@@ -36,11 +39,114 @@ namespace BasicRoutingService
 {
 
 /**
- * This operation replaces one session in a Bridge with a new session routable by the 
- * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
- * connectBridgedSessionsWithDestination() operation. The template parameter T is the type 
- * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to 
- * the initiator of this operation. 
+ * This listener monitors the progress of an operation, and pushes relevant state to the replicator. 
+ */
+class ConnectBridgedSessionsWithDestReplicatingListener : public SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener
+{
+public:
+    ConnectBridgedSessionsWithDestReplicatingListener(ConnectBridgedSessionsWithDestinationOperationPtr op, 
+                          const AsteriskSCF::SmartProxy::SmartProxy<
+                            AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)  
+                            : mOperation(op), mReplicator(replicator)
+    {
+    }
+
+    /**
+     * This callback is called just before the execution of a state machine's current state handler. 
+     */
+    void stateExecutionStart(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+    {
+        switch(state)
+        {
+        case ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP:
+            {
+            // Push this information to the state replicator.
+            ConnectBridgedSessionsWithDestinationOpStartPtr opStart(new ConnectBridgedSessionsWithDestinationOpStart());
+            opStart->transactionId = mOperation->getTransactionId();
+            opStart->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestStartKeyMod;
+            opStart->sessionToReplace = mOperation->getSessionToReplace();
+            opStart->destination = mOperation->getDestination();
+
+            pushState(opStart);
+            }
+            break;
+
+        case ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS:
+            {
+            // We just completed the entire operation. 
+            // Push this information to the state replicator.
+            ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
+            waitLookup->transactionId = mOperation->getTransactionId();
+            waitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+
+            pushState(waitLookup);
+            }
+            break;
+
+        case ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING:
+            {
+            // We just completed the entire operation. 
+            // Push this information to the state replicator.
+            ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
+            bridgeOp->transactionId = mOperation->getTransactionId();
+            bridgeOp->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
+            bridgeOp->endpoints = mOperation->getLookupResult();
+
+            pushState(bridgeOp);
+            }
+            break;
+        }
+    }
+
+    void pushState(RoutingStateItemPtr item)
+    {
+        RoutingStateItemSeq setItems;
+
+        setItems.push_back(item);
+        mReplicator->setState(setItems);
+        
+        // Cache the keys of all pushed items.
+        mReplicatedStateKeys.push_back(item->key);
+    }
+
+    /**
+     * This callback is called just before the execution of a state machine's current state handler. 
+     */
+    void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+    {
+    }
+
+    /**
+     * This method is sent when the operation state machine is shutting down. 
+     */
+    void shutdown()
+    {
+        // We just completed the entire operation. 
+        // Remove the items that represented this operation's state transitions from the state replicator.
+        mReplicator->removeState(mReplicatedStateKeys);
+
+        // Release our reference to the operation. 
+        mOperation.reset();
+    }
+
+    /**
+     * This is called when a state transition is occuring. 
+     */
+    void stateTransition(ConnectBridgedSessionsWithDestinationOp::OperationState oldState, 
+                         ConnectBridgedSessionsWithDestinationOp::OperationState newState)
+    {
+    }
+
+private:
+    Ice::StringSeq mReplicatedStateKeys;
+    ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
+    AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
+
+}; // end ConnectBridgedSessionsWithDestReplicatingListener
+
+/**
+ * Primary constructor. This class represents an operation that replaces one session in a 
+ * Bridge with a new session routable by the destination param. 
  * 
  * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
  */
@@ -48,29 +154,78 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
                           const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
                           const ::std::string& destination, 
                           const ::Ice::Current& current,
-                          const SessionContext& context,
-                          OperationsManager* const listener)
+                          const SessionContextPtr& context,
+                          OperationsManager* const listener,
+                          std::string transactionId)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(cb, 
                                 context, 
                                 listener,
-                                ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP),
+                                ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
+                                transactionId),
         mInitiatorCallback(cb),
         mSessionToReplace(sessionToReplace),
         mDestination(destination),
         mIceCurrent(current)
 {
+    mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
+    mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
+    mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+}
+
+/**
+ * This is the factory method for this operation.
+ */
+ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::create(
+                          const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContextPtr& context,
+                          OperationsManager* const listener)
+{
     Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
     if (it == current.ctx.end())
     {
          lg(Error) << "ConnectBridgedSessionsWithDestinationOperation() called with no transaction ID set!"  ;
          throw InvalidParamsException();
     }
+    std::string transactionId = (it->second);
 
-    mTransactionId = (it->second);
+    ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(cb,
+                                                        sessionToReplace,
+                                                        destination,
+                                                        current,
+                                                        context,
+                                                        listener,
+                                                        transactionId) );
+
+    boost::shared_ptr<SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener> replicatingListener(new ConnectBridgedSessionsWithDestReplicatingListener(op, context->stateReplicator));
+    op->addStateMachineListener(replicatingListener);
+
+    return op;
+}
+                
+/**
+ * Constructor to service replicas. 
+ */
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context)
+        : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(
+                                context, 
+                                ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP
+                                )
+{
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
     mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+}
 
+/**
+ * Factory for replica objects. 
+ */
+ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::createReplica(const SessionContextPtr& context)
+{
+    ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(context));
+    return op;
 }
 
 ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDestinationOperation() 
@@ -78,6 +233,47 @@ ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDesti
         lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
 }
 
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem)
+{
+    ConnectBridgedSessionsWithDestinationOpStartPtr start;
+    ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup;
+    ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridging;
+
+    if ((start = ConnectBridgedSessionsWithDestinationOpStartPtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(start);
+    }
+    else if ((waitLookup = ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(waitLookup);
+    }
+    else if ((bridging = ConnectBridgedSessionsWithDestinationOpBridgingStatePtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(bridging);
+    }
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr item)
+{
+    mSessionToReplace = item->sessionToReplace;
+    mDestination = item->destination;
+    mTransactionId = item->transactionId;
+
+    mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr item)
+{
+    mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr item)
+{
+    mLookupResult = item->endpoints;
+
+    mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+}
+
 /**
  * This is a state handler for one of this operation's states. 
  */
@@ -109,7 +305,7 @@ void ConnectBridgedSessionsWithDestinationOperation::lookupState()
     // The wrapper we're using will remove the listener and free it when
     // this method is left.
     lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
-    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, mSessionToReplace));
     mListenerManager = listener;
 
     // Route the destination
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 61557a2..b1d35b1 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -46,6 +46,10 @@ namespace ConnectBridgedSessionsWithDestinationOp
     };
 }
 
+class ConnectBridgedSessionsWithDestinationOperation;
+
+typedef boost::shared_ptr<ConnectBridgedSessionsWithDestinationOperation> ConnectBridgedSessionsWithDestinationOperationPtr;
+
 /**
  * This operation replaces one session in a Bridge with a new session routable by the 
  * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
@@ -59,17 +63,51 @@ namespace ConnectBridgedSessionsWithDestinationOp
                                                                                        ConnectBridgedSessionsWithDestinationOp::OperationState>
 {
 public:
-    ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
-                          const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
-                          const std::string& destination, 
-                          const ::Ice::Current& current,
-                          const SessionContext& context,
-                          OperationsManager* const listener);
+    /**
+     * Factory method for the class. This method creates an active operation. 
+     */
+    static ConnectBridgedSessionsWithDestinationOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                                                                    const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                                                                    const std::string& destination, 
+                                                                    const ::Ice::Current& current,
+                                                                    const SessionContextPtr& context,
+                                                                    OperationsManager* const listener);
 
     virtual ~ConnectBridgedSessionsWithDestinationOperation();
 
 
+    AsteriskSCF::SessionCommunications::V1::SessionPrx getSessionToReplace() {return mSessionToReplace;}
+
+    std::string getDestination() {return mDestination;}
+
+    /**
+     * Factory method for replica objects. 
+     */
+    static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(const SessionContextPtr& context);
+
+    /**
+     * Update a replica object with new state information.
+     */
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+
+protected:
+    ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+                        const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace, 
+                        const std::string& destination, 
+                        const ::Ice::Current& current,
+                        const SessionContextPtr& context,
+                        OperationsManager* const listener,
+                        std::string transactionId);
+
+    // Constructor to service replicas.
+    ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context);
+
 private:
+
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr stateItem);
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr stateItem);
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr stateItem);
+
     /**
      * This is a state handler for one of this operation's states. 
      */
@@ -101,9 +139,9 @@ private:
     AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
     AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
 
-}; // class ConnectBridgedSessionsWithDestinationOperation
+    std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState> mReplicatedStates;
 
+}; // class ConnectBridgedSessionsWithDestinationOperation
 
-	
 } // end BasicRoutingService
 } // end AsteriskSCF
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 34eb1a2..3c1fe6e 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -26,11 +26,13 @@
 #include "RoutingServiceEventPublisher.h"
 #include "ScriptProcessor.h"
 
+using namespace ::std;
 using namespace ::AsteriskSCF::Core::Endpoint::V1;
 using namespace ::AsteriskSCF::Core::Routing::V1;
 using namespace ::AsteriskSCF::System::Logging;
 using namespace ::AsteriskSCF::Core::Routing::V1::Event;
-using namespace ::std;
+using namespace ::AsteriskSCF::BasicRoutingService::V1;
+using namespace ::AsteriskSCF::SmartProxy;
 
 namespace
 {
@@ -41,7 +43,6 @@ namespace AsteriskSCF
 {
 namespace BasicRoutingService
 {
-
 struct RegisteredLocator
 {
 public:
@@ -115,8 +116,119 @@ public:
         destination.insert(mEndpointLocatorMap.begin(), mEndpointLocatorMap.end());
     }
 
+    /**
+     * Forwards the result of processing a remove endpoint operation. 
+     */
+    void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
+    {
+        if (!mActive)
+        {
+            return;
+        }
+
+        // Forward to state replicator
+        if (result == Event::SUCCESS)
+        {
+            try
+            {
+                // Push this information to the state replicator.
+                RoutingStateItemSeq setItems;
+
+                EndpointLocatorRemovePtr removeEndpointItem(new EndpointLocatorRemove());
+                removeEndpointItem->key = locatorId;
+
+                setItems.push_back(removeEndpointItem);
+
+                mStateReplicator->setState(setItems);
+            }
+            catch(const Ice::Exception& e)
+            {
+                 lg(Debug) << "EndpointRegistry unable to replicate removeEndpointLocator(): " << e.what();
+            }
+        }
+
+        // Forward to event publisher.
+        mEventPublisher->removeEndpointLocatorEvent(locatorId, result);
+    }
+
+    /**
+     * Forwards the result of processing a remove endpoint operation. 
+     */
+    void forwardAddEndpointLocator(const std::string& locatorId,
+                                   const RegExSeq& regexList,
+                                   const EndpointLocatorPrx& locator, 
+                                   Event::OperationResult result)
+    {
+        if (!mActive)
+        {
+            return;
+        }
+
+        // Forward to state replicator
+        if (result == Event::SUCCESS)
+        {
+            try
+            {
+                // Push this information to the state replicator.
+                RoutingStateItemSeq setItems;
+
+                EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
+                addEndpointItem->key = locatorId;
+                addEndpointItem->locator = locator;
+                addEndpointItem->regExList = regexList;
+
+                setItems.push_back(addEndpointItem);
+
+                mStateReplicator->setState(setItems);
+            }
+            catch(const Ice::Exception& e)
+            {
+                lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
+            }
+        }
+
+        // Forward to event publisher.
+        mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, result);
+    }
+
+    void forwardEndpointLocatorDestIdChange(const std::string& locatorId,
+                                            const RegExSeq& regexList, 
+                                            OperationResult result)
+    {
+        if (!mActive)
+        {
+            return;
+        }
+
+        // Forward to state replicator
+        if (result == Event::SUCCESS)
+        {
+            try
+            {
+                // Push this information to the state replicator.
+                RoutingStateItemSeq setItems;
+
+                EndpointLocatorSetDestIdsPtr setDestIdsItem(new EndpointLocatorSetDestIds());
+                setDestIdsItem->key = locatorId;
+                setDestIdsItem->regExList = regexList;
+
+                setItems.push_back(setDestIdsItem);
+
+                mStateReplicator->setState(setItems);
+            }
+            catch(const Ice::Exception& e)
+            {
+                lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
+            }
+        }
+
+        mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
+    }
+
     boost::shared_mutex mLock;
 
+    AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
+
     boost::shared_ptr<ScriptProcessor> mScriptProcessor;
     EndpointLocatorMap mEndpointLocatorMap;
     const RoutingEventsPtr mEventPublisher;
@@ -231,6 +343,11 @@ void EndpointRegistry::setActive(bool isActive)
     mImpl->mActive = isActive;
 }
 
+void EndpointRegistry::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx>& replicator)
+{
+    mImpl->mStateReplicator = replicator;
+}
+
 /**
  * Returns the endpoints that match the specified destination id.
  *   @param id String identifier of the the destination.
@@ -341,13 +458,13 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
         RegisteredLocator newLocator(locator, regexList);
         mImpl->insertLocatorMapItem(locatorId, newLocator);
 
-        mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::SUCCESS);
+        mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::SUCCESS);
     }
     catch (...)
     {
 
         lg(Error) << "Exception adding EndpointLocator.";
-        mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::FAILURE);
+        mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
         return;
     }
 }
@@ -368,19 +485,19 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
         if (!exists)
         {
             lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
-            mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
+            mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
             return;
         }
 
         mImpl->eraseLocatorMapItem(locatorId);
 
-        mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::SUCCESS);
+        mImpl->forwardRemoveEndpointLocator(locatorId, Event::SUCCESS);
 
         lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
     }
     catch(const std::exception &e)
     {
-        mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
+        mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
         lg(Error) << e.what();
     }
 }
@@ -401,18 +518,18 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
 
         if (!exists)
         {
-            mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
+            mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::FAILURE);
             throw DestinationNotFoundException(locatorId);
         }
 
         // Replace the regular expression.
         existing->second.setRegEx(regExList);
-        mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::SUCCESS);
+        mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::SUCCESS);
 
     }
     catch(const std::exception &e)
     {
-        mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
+        mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::FAILURE);
         lg(Error) << "Exception modifying the destination specifications for EndpointLocator " << locatorId;
         lg(Error) << "   - " << e.what();
     }
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
new file mode 100644
index 0000000..3045ab8
--- /dev/null
+++ b/src/OperationReplicaCache.cpp
@@ -0,0 +1,195 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <AsteriskSCF/Threading/WorkQueue.h>
+
+#include "OperationReplicaCache.h"
+#include "SessionRouterOperation.h"
+
+using namespace AsteriskSCF::Threading;
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
+
+/** 
+ * For each transaction id, we're going to cache the all the state items for the operation.
+ * The reason for this is that we don't want to rely on the order in which we will receive the state updates. 
+ * This class is used to hold all the state updates for a given operation. 
+ * 
+ * When an item is called for from the cache, we'll apply all the available state updates we have, 
+ * in order, up to the highest we have (without missing a state, since each state depends on the results 
+ * of the previous.) 
+ *
+ * NOTE: TBD...This class needs some templatized functions to avoid all the duplication in the case statements. 
+ */
+template<typename T>
+struct OperationReplicaItem
+{
+public:
+    T mOperation;
+    StateItemMapType mItems;
+};
+
+typedef std::map<std::string, OperationReplicaItem<RouteSessionOperationPtr> > RouteSessionMapType;
+typedef std::map<std::string, OperationReplicaItem<ConnectBridgedSessionsWithDestinationOperationPtr> > ConnectBridgeWithDestMapType;
+
+class OperationReplicaCachePriv
+{
+public: 
+
+    OperationReplicaCachePriv(const SessionContextPtr& sessionContext) 
+                      : mSessionContext(sessionContext)
+    {
+    }
+
+    SessionContextPtr mSessionContext;
+    RouteSessionMapType routeSessionReplicas;
+    ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
+};
+
+OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext) 
+        : mPriv(new OperationReplicaCachePriv(sessionContext))
+{
+}
+
+
+void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+{
+        switch(type)
+        {
+        case ROUTE_SESSION_OP:
+            {
+            // See if this transaction is in the cache.
+            RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(item->transactionId);
+            if (i == mPriv->routeSessionReplicas.end())
+            {
+                // Add an entry to the cache.
+                OperationReplicaItem<RouteSessionOperationPtr> replica;
+                mPriv->routeSessionReplicas[item->transactionId] = replica;
+
+                i = mPriv->routeSessionReplicas.find(item->transactionId);
+            }
+            (*i).second.mItems[item->key] = item;
+
+            // If we haven't created the replica yet, do so now. 
+            if ((*i).second.mOperation.get() == 0)
+            {
+                RouteSessionOperationPtr work(RouteSessionOperation::createReplica(mPriv->mSessionContext));
+                (*i).second.mOperation = work;
+            }
+
+            // Update the replicated object with newest state update. 
+            (*i).second.mOperation->reflectUpdate(item);
+            }
+            break;
+
+        case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+            break;
+
+        case CONNECT_BRIDGED_SESSIONS_OP:
+            // Not replicating this type. 
+            break;
+        }
+}
+
+bool OperationReplicaCache::fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref)
+{
+    switch(type)
+    {
+    case ROUTE_SESSION_OP:
+    {
+        if (mPriv->routeSessionReplicas.empty())
+        {
+            return false;
+        }
+
+        RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+        if (i == mPriv->routeSessionReplicas.end())
+        {
+            return false;
+        }
+
+        ref = (*i).second.mOperation;
+        mPriv->routeSessionReplicas.erase(i);
+
+        return true;
+    }
+    break;
+
+    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+    {
+        if (mPriv->connectBridgedWithDestReplicas.empty())
+        {
+            return false;
+        }
+    }
+    break;
+    }
+
+    return false;
+}
+
+void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
+{
+    switch(type)
+    {
+    case ROUTE_SESSION_OP:
+        {
+            RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+            if (i != mPriv->routeSessionReplicas.end())
+            {
+                 mPriv->routeSessionReplicas.erase(i);
+            }
+        }
+        break;
+
+    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+        {
+            ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
+            if (i != mPriv->connectBridgedWithDestReplicas.end())
+            {
+                 mPriv->connectBridgedWithDestReplicas.erase(i);
+            }
+        }
+        break;
+    }
+}
+
+void OperationReplicaCache::clearCache(OperationType type)
+{
+    switch(type)
+    {
+    case ROUTE_SESSION_OP:
+        mPriv->routeSessionReplicas.clear();
+        break;
+
+    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+        mPriv->connectBridgedWithDestReplicas.clear();
+        break;
+    }
+}
+
+void OperationReplicaCache::clearCache()
+{
+    mPriv->routeSessionReplicas.clear();
+    mPriv->connectBridgedWithDestReplicas.clear();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
new file mode 100644
index 0000000..ff89752
--- /dev/null
+++ b/src/OperationReplicaCache.h
@@ -0,0 +1,62 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/Threading/WorkQueue.h>
+
+#include "BasicRoutingStateReplicationIf.h"
+#include "SessionRouter.h"
+#include "RouteSessionOperation.h"
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+#include "ConnectBridgedSessionsOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+enum OperationType
+{
+    ROUTE_SESSION_OP = 0,
+    CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP = 1,
+    CONNECT_BRIDGED_SESSIONS_OP = 2
+};
+
+class OperationReplicaCachePriv;
+class SessionContext;
+
+class OperationReplicaCache
+{
+public: 
+    OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
+
+    void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
+    bool fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref);
+    void dropRouteSessionOperation(OperationType type, std::string transactionId);
+
+    void clearCache(OperationType type);
+    void clearCache();
+
+private:
+    boost::shared_ptr<OperationReplicaCachePriv> mPriv;
+};
+typedef boost::shared_ptr<OperationReplicaCache> OperationReplicaCachePtr;
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 127d3d8..97d60d3 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -39,12 +39,12 @@ namespace BasicRoutingService
 {
 
 /**
- * This listener monitors the progress of an operation, and push
+ * This listener monitors the progress of an operation, and pushes relevant state to the replicator. 
  */
-class RouteSessionReplicatingListener : public SimpleStateMachine<RouteSessionOp::OperationState, boost::function<void ()> >::StateMachineListener
+class RouteSessionReplicatingListener : public SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener
 {
 public:
-    RouteSessionReplicatingListener(RouteSessionOperation *op, 
+    RouteSessionReplicatingListener(RouteSessionOperationPtr op, 
                           const AsteriskSCF::SmartProxy::SmartProxy<
                             AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)  
                             : mOperation(op), mReplicator(replicator)
@@ -60,30 +60,26 @@ public:
         {
         case RouteSessionOp::STATE_LOOKUP:
             {
+            // This is the initial state. All the state of interest is what's been passed in.
             // Push this information to the state replicator.
-            RoutingStateItemSeq setItems;
-
             RouteSessionOpStartPtr routeSessionOpStart(new RouteSessionOpStart());
+            routeSessionOpStart->transactionId =  mOperation->getTransactionId();
             routeSessionOpStart->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartKeyMod;
             routeSessionOpStart->source = mOperation->getSource();
             routeSessionOpStart->destination = mOperation->getDestination();
 
-            setItems.push_back(routeSessionOpStart);
-            mReplicator->setState(setItems);
+            pushState(routeSessionOpStart);
             }
             break;
 
         case RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS:
             {
-            // We just completed the entire operation. 
-            // Push this information to the state replicator.
-            RoutingStateItemSeq setItems;
-
+            // We've sent out our lookup request via AMI.
             RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
+            routeSessionOpWaitLookup->transactionId =  mOperation->getTransactionId();
             routeSessionOpWaitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
 
-            setItems.push_back(routeSessionOpWaitLookup);
-            mReplicator->setState(setItems);
+            pushState(routeSessionOpWaitLookup);
             }
             break;
 
@@ -91,40 +87,47 @@ public:
             {
             // We just completed the entire operation. 
             // Push this information to the state replicator.
-            RoutingStateItemSeq setItems;
-
             RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
+            routeSessionOpBridging->transactionId =  mOperation->getTransactionId();
             routeSessionOpBridging->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
             routeSessionOpBridging->endpoints = mOperation->getLookupResult();
 
-            setItems.push_back(routeSessionOpBridging);
-            mReplicator->setState(setItems);
+            pushState(routeSessionOpBridging);
             }
             break;
   
         }
     }
 
+    void pushState(RoutingStateItemPtr item)
+    {
+        RoutingStateItemSeq setItems;
+
+        setItems.push_back(item);
+        mReplicator->setState(setItems);
+        
+        // Cache the keys of all pushed items.
+        mReplicatedStateKeys.push_back(item->key);
+    }
+
     /**
-     * This callback is called just before the execution of a state machine's current state handler. 
+     * This callback is called just after the execution of a state machine's current state handler. 
      */
     void stateExecutionComplete(RouteSessionOp::OperationState state)
     {
-        switch(state)
-        {
-        case RouteSessionOp::STATE_BRIDGING:
-            // We just completed the entire operation. 
-            // Remove the items that represented this operation's state transitions.
-            Ice::StringSeq stateKeys;
-
-             stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartKeyMod);
-             stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod);
-             stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod);
+    }
 
-            mReplicator->removeState(stateKeys);
-            break;
-        }
+    /**
+     * This method is sent when the operation state machine is shutting down. 
+     */
+    void shutdown()
+    {
+        // We just completed the entire operation. 
+        // Remove the items that represented this operation's state transitions.
+        mReplicator->removeState(mReplicatedStateKeys);
 
+        // Release our reference to the operation. 
+        mOperation.reset();
     }
 
     /**
@@ -135,15 +138,14 @@ public:
     }
 
 private:
-    RouteSessionOperation* mOperation;
+    Ice::StringSeq mReplicatedStateKeys;
+    RouteSessionOperationPtr mOperation;
     AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
 };
 
 /**
- * This is a specialization of the SessionRouterOperation<T> to handle the
- * routeSession() operation. The template parameter T is the type of the routeSession()
- * AMD callback handler to allow this object to send results to the initiator of this
- * operation. 
+ * Constructor. This class is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. 
  * 
  * This object is an instance of WorkQueue::Work so that
  * it can be enqueued to a worker thread. 
@@ -152,33 +154,121 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
                                                 const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
                                                 const ::std::string& destination, 
                                                 const ::Ice::Current& current,
-                                                const SessionContext& context,
-                                                OperationsManager* const listener) 
+                                                const SessionContextPtr& context,
+                                                OperationsManager* const listener,
+                                                std::string transactionId) 
                 : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(cb, 
                                                                                                             context, 
                                                                                                             listener,
-                                                                                                            RouteSessionOp::STATE_LOOKUP),
+                                                                                                            RouteSessionOp::STATE_LOOKUP,
+                                                                                                            transactionId),
                 mInitiatorCallback(cb),
                 mSource(source),
                 mDestination(destination),
                 mIceCurrent(current)
 {
+    // Configure the state machine with state handlers. 
+    mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
+    mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
+    mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+}
+
+/**
+ * This is the factory method for RouteSessionOperation.
+ */
+RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_routeSessionPtr& cb,
+                              const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                              const ::std::string& destination, 
+                              const ::Ice::Current& current,
+                              const SessionContextPtr& context,
+                              OperationsManager* const listener) 
+{
     Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
     if (it == current.ctx.end())
     {
-         lg(Error) << "RouteSessionOperation() called with no transaction ID set!";
+         lg(Error) << "RouteSessionOperation() called with no transaction ID set in context. Unable to replicate operation.";
          throw InvalidParamsException();
     }
-    mTransactionId = (it->second);
+    std::string transactionId = (it->second);
+
+    RouteSessionOperationPtr op (new RouteSessionOperation(cb,
+                                                            source,
+                                                            destination,
+                                                            current,
+                                                            context,
+                                                            listener,
+                                                            transactionId));
 
-    boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState, boost::function<void ()> >::StateMachineListener > ptr(new RouteSessionReplicatingListener(this, context.stateReplicator));
-    mStateMachine.addListener(ptr);
+    // Create a listener for pushing replication data. 
+    boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener> replicatingListener(new RouteSessionReplicatingListener(op, context->stateReplicator));
+    op->addStateMachineListener(replicatingListener);
+
+    return op;
+}
 
+/**
+ * Alternate constructor for replicas. 
+ */
+RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionContext) 
+                : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(sessionContext, RouteSessionOp::STATE_LOOKUP)
+{
+    // Configure the state machine with state handlers. 
     mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
     mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
 }
 
+/**
+ * This is the factory method for creating a replica of a RouteSessionOperation.
+ */
+RouteSessionOperationPtr RouteSessionOperation::createReplica(const SessionContextPtr& sessionContext)
+{
+    RouteSessionOperationPtr op (new RouteSessionOperation(sessionContext));
+
+    return op;
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem)
+{
+    RouteSessionOpStartPtr start;
+    RouteSessionOpWaitLookupStatePtr waitLookup;
+    RouteSessionOpBridgingStatePtr bridging;
+
+    if ((start = RouteSessionOpStartPtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(start);
+    }
+    else if ((waitLookup = RouteSessionOpWaitLookupStatePtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(waitLookup);
+    }
+    else if ((bridging = RouteSessionOpBridgingStatePtr::dynamicCast(stateItem)) != 0)
+    {
+        reflectUpdate(bridging);
+    }
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr item)
+{
+    mSource = item->source;
+    mDestination = item->destination;
+    mTransactionId = item->transactionId;
+
+    mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item)
+{
+    mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item)
+{
+    mLookupResult = item->endpoints;
+
+    mReplicatedStates.push_back(RouteSessionOp::STATE_BRIDGING);
+}
+
 RouteSessionOperation::~RouteSessionOperation() 
 {
         lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
@@ -193,7 +283,7 @@ void RouteSessionOperation::lookupState()
 {
     lg(Debug) << "routeSession() entered with destination " << mDestination ;
 
-    if (!mSessionContext.bridgeManager.initializeOnce())
+    if (!mSessionContext->bridgeManager.initializeOnce())
     {
         lg(Error) << "No proxy to BridgeManager.  "
             "Make sure all services are running.";
@@ -205,7 +295,7 @@ void RouteSessionOperation::lookupState()
     // Create a listener for the source to handle early termination.
     // The wrapper we're using will remove the listener and free it when
     // this method is left.
-    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+    SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, mSource));
     mListenerManager = listener;
 
     // Set the state handler to exectute once we've looked up our endpoints. 
@@ -224,8 +314,9 @@ void RouteSessionOperation::lookupState()
  */
 void RouteSessionOperation::waitOnLookupState()
 {
-    if (mFinished)
+    if (mFinished) 
     {
+        // An exception must have terminated this operation.
         return;
     }
 
@@ -285,7 +376,7 @@ void RouteSessionOperation::establishBridgeState()
         bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
 
         lg(Debug) << "routeSession(): Creating bridge.";
-        bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+        bridge = mSessionContext->bridgeManager->createBridge(bridgedSessions, 0);
     }
     catch (const Ice::Exception &e)
     {
@@ -312,6 +403,9 @@ void RouteSessionOperation::establishBridgeState()
 
     // This operation is complete. Send AMD responses. 
     finishAndSendResult();
+
+    // Shutdown the state machine. 
+    mStateMachine.shutdown();
 }
 
 } // end BasicRoutingService
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 2e83ae5..921f429 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -20,6 +20,8 @@
 #include <AsteriskSCF/Core/Routing/RoutingIf.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 
+#include "BasicRoutingStateReplicationIf.h"
+
 #include "SessionRouterOperation.h"
 
 namespace AsteriskSCF
@@ -44,11 +46,12 @@ namespace RouteSessionOp
     };
 }
 
+class RouteSessionOperation;
+typedef boost::shared_ptr<RouteSessionOperation> RouteSessionOperationPtr;
+
 /**
- * This is a specialization of the SessionRouterOperation<T> to handle the
- * routeSession() operation. The template parameter T is the type of the routeSession()
- * AMD callback handler to allow this object to send results to the initiator of this
- * operation. 
+ * This is a specialization of the SessionRouterOperation<T,S> to handle the
+ * routeSession() operation. 
  * 
  * This object is an instance of WorkQueue::Work so that
  * it can be enqueued to a worker thread. 
@@ -58,12 +61,15 @@ class  RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::Sessio
                                                              RouteSessionOp::OperationState>
 {
 public:
-    RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
-                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
-                          const ::std::string& destination, 
-                          const ::Ice::Current& current,
-                          const SessionContext& context,
-                          OperationsManager* const listener);
+    /**
+     * Factory method for the class. This method creates an active routing operation. 
+     */
+    static RouteSessionOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                                           const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                                           const ::std::string& destination, 
+                                           const ::Ice::Current& current,
+                                           const SessionContextPtr& context,
+                                           OperationsManager* const listener);
 
     virtual ~RouteSessionOperation();
 
@@ -71,7 +77,44 @@ public:
 
     std::string getDestination() {return mDestination;}
     
+    /**
+     * Factory method for replica objects. 
+     */
+    static RouteSessionOperationPtr createReplica(const SessionContextPtr& context);
+
+    /**
+     * Update a replica object with new state information.
+     */
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+
+protected:
+    // Normal constructor
+    RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+                          const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
+                          const ::std::string& destination, 
+                          const ::Ice::Current& current,
+                          const SessionContextPtr& context,
+                          OperationsManager* const listener,
+                          std::string transactionId);
+
+   // Constructor for replicas.
+   RouteSessionOperation(const SessionContextPtr& context);
+
 private:
+    /**
+     * Update a replica object with new state information of a specific type.
+     */
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr item);
+
+    /**
+     * Update a replica object with new state information of a specific type.
+     */
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item);
+
+    /**
+     * Update a replica object with new state information of a specific type.
+     */
+    void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item);
 
     /**
      * This is a state handler for one of this operation's states. 
@@ -104,6 +147,8 @@ private:
     std::string mDestination;
     ::Ice::Current mIceCurrent;
 
+    std::vector<RouteSessionOp::OperationState> mReplicatedStates;
+
 }; // class RouteSessionOperation
 
 	
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index f6163a8..3863b03 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -143,8 +143,6 @@ public:
     }
 
 public:
-    AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mStateReplicator;
-
     Event::RoutingEventsPrx mEventTopic; // Using one-way proxy. 
     boost::mutex mLock;
 
@@ -152,7 +150,6 @@ private:
     Ice::ObjectAdapterPtr mAdapter;
     bool mInitialized;
     bool mActive;
-
 };
 
 /**
@@ -168,11 +165,6 @@ void RoutingServiceEventPublisher::setActive(bool val)
     mImpl->setActive(val);
 }
 
-void RoutingServiceEventPublisher::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
-{
-    mImpl->mStateReplicator = replicator;
-}
-
 /**
  * Send a message to the service's event topic to report a lookup event.
  */
@@ -204,27 +196,6 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
-    if (mImpl->isActive() && result == Event::SUCCESS)
-    {
-        try
-        {
-            // Push this information to the state replicator.
-            RoutingStateItemSeq setItems;
-
-            EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
-            addEndpointItem->key = locatorId;
-            addEndpointItem->locator = locator;
-            addEndpointItem->regExList = regexList;
... 409 lines suppressed ...


-- 
asterisk-scf/integration/routing.git



More information about the asterisk-scf-commits mailing list