[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Apr 18 22:30:00 CDT 2011


branch "route_replica" has been updated
       via  b4937c1744de2fb71fc48a364609e14f63462cef (commit)
      from  3565da61eef05819148debaa444cf4cef47601cd (commit)

Summary of changes:
 config/routingtest-integ.config.in                 |   42 +++++--
 local-slice/BasicRoutingStateReplicationIf.ice     |   10 +-
 src/BasicRoutingServiceApp.cpp                     |   12 +-
 src/BasicRoutingStateReplicatorApp.cpp             |    4 +-
 ...nectBridgedSessionsWithDestinationOperation.cpp |    8 +-
 src/EndpointRegistry.cpp                           |   11 +-
 src/OperationReplicaCache.cpp                      |  143 +++++++++++++-------
 src/OperationReplicaCache.h                        |    2 +-
 src/RouteSessionOperation.cpp                      |    8 +-
 src/RoutingStateReplicatorListener.cpp             |  135 +++++++++++++------
 src/RoutingStateReplicatorListener.h               |   17 ++-
 test/CMakeLists.txt                                |    4 +
 test/TestRouting.cpp                               |  134 +++++++++----------
 13 files changed, 322 insertions(+), 208 deletions(-)


- Log -----------------------------------------------------------------
commit b4937c1744de2fb71fc48a364609e14f63462cef
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Mon Apr 18 22:29:21 2011 -0500

    More updates.

diff --git a/config/routingtest-integ.config.in b/config/routingtest-integ.config.in
index 8365b39..1c64594 100644
--- a/config/routingtest-integ.config.in
+++ b/config/routingtest-integ.config.in
@@ -1,15 +1,24 @@
 # This is a configuration file a single process test of the Routing Service.
 
+#Ice.Admin.Endpoints=tcp -p 10006
+#Ice.Admin.InstanceName=IceBox
+IceBox.InstanceName=IceBox
+IceBox.ServiceManager.Endpoints=tcp -p 10007
+
 IceBox.InheritProperties=1
 
-IceBox.Service.ServiceDiscovery=${service_locator_bindir}/src at service_locator:create
-IceBox.Service.BasicRoutingService=../src at BasicRoutingService:create
+IceBox.Service.ServiceDiscovery=service_locator:create
+IceBox.Service.RoutingService=BasicRoutingService:create
+IceBox.Service.RoutingService2=BasicRoutingService:create
+
 
 # Boost Test arguments can be passed here.
-# IceBox.Service.RoutingTest = RoutingTest:create --log_level=all
+#IceBox.Service.RoutingTest = RoutingTest:create --log_level=all
 IceBox.Service.RoutingTest = RoutingTest:create
 
-IceBox.LoadOrder=ServiceDiscovery,BasicRoutingService,RoutingTest
+IceBox.Service.Replicator=BasicRoutingStateReplicator:create
+
+IceBox.LoadOrder=ServiceDiscovery,Replicator,RoutingService,RoutingService2,RoutingTest
 
 # Where to find the Service Locator manager. We need the Service Locator in order to be able to plug in to the Asterisk SCF system Discovery mechanisms.
 LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
@@ -26,12 +35,25 @@ BridgeManager.ServiceLocatorId=BridgeService
 ###########################################
 # Routing Service properties
 
-BasicRoutingServiceAdapter.Endpoints=tcp -p 10050
-
-BasicRoutingServiceAdapter.ThreadPool.Size=4
-BasicRoutingServiceAdapter.ThreadPool.SizeMax=10
-BasicRoutingServiceAdapter.ThreadPool.SizeWarn=9
-
+RoutingService.Endpoints=tcp -p 10050
+RoutingService.ThreadPool.Size=4
+RoutingService.ThreadPool.SizeMax=10
+RoutingService.ThreadPool.SizeWarn=9
+RoutingService.Standby=no
+RoutingService.StateReplicatorName=Replicator
+
+RoutingService2.Endpoints=tcp -p 10051
+RoutingService2.ThreadPool.Size=4
+RoutingService2.ThreadPool.SizeMax=10
+RoutingService2.ThreadPool.SizeWarn=9
+RoutingService2.Standby=yes
+RoutingService2.StateReplicatorName=Replicator
+
+Replicator.InstanceName=Replicator
+Replicator.RoutingReplicator.Endpoints=default -p 10052
+Replicator.RoutingReplicator.ThreadPool.Size=4
+
+# AsteriskSCF.RoutingService.logger=Debug
 
 ###########################################
 # Test properties
diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 676538d..01b1579 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -62,6 +62,7 @@ module V1
     interface RoutingStateReplicatorListener
     {
         void stateRemoved(Ice::StringSeq itemKeys);
+		void stateRemovedForItems(RoutingStateItemSeq items);
         void stateSet(RoutingStateItemSeq items);
     };
 
@@ -74,6 +75,7 @@ module V1
        void removeListener(RoutingStateReplicatorListener *listener);
        void setState (RoutingStateItemSeq items);
        void removeState(Ice::StringSeq items);
+	   void removeStateForItems(RoutingStateItemSeq items);
        idempotent RoutingStateItemSeq getState(Ice::StringSeq itemKeys);
        idempotent RoutingStateItemSeq getAllState();
     };
@@ -181,14 +183,6 @@ module V1
     };
 
 	/**  
-	 * Represents a removed endpoint locator. 
-	 * The key (in the base state item) is the locatorId. 
-	 */
-    class EndpointLocatorRemove extends RoutingStateItem
-    { 
-    };
-
-	/**  
 	 * Represents a change of the endpoint locators 
 	 * managed ids. 
 	 * The key (in the base state item) is the locatorId. 
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 348d871..07031ae 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -303,7 +303,9 @@ void BasicRoutingServiceApp::listenToStateReplicator()
 }
 
 /** 
- * 
+ * Unregister as a listener to our state replicator. 
+ * A component in active mode doesn't neeed to listen to
+ * state replication data. 
  */
 void BasicRoutingServiceApp::stopListeningToStateReplicator()
 {
@@ -434,7 +436,7 @@ void BasicRoutingServiceApp::locateStateReplicator(bool isActive)
 {
     BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
     replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
-    replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault("Sip.StateReplicatorName", "default");
+    replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".StateReplicatorName", "default");
 
     AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> replicator(mServiceLocator, replicatorParams, lg);
 
@@ -450,9 +452,9 @@ void BasicRoutingServiceApp::initialize()
     try
     {
         // Create the adapter.
-        mAdapter = mCommunicator->createObjectAdapter("BasicRoutingServiceAdapter");
+        mAdapter = mCommunicator->createObjectAdapter(mAppName);
 
-        bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault("BasicRoutingService.Standby", "no") == "yes");
+        bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".Standby", "no") == "yes");
 
         // Create the replication context.
         ReplicationContextPtr replicationContext(new ReplicationContext(isActive));
@@ -509,7 +511,7 @@ void BasicRoutingServiceApp::initialize()
         mAdapter->add(mReplicaManagement, mCommunicator->stringToIdentity(ReplicaServiceId));
 
         // Create and publish our state replicator listener interface.
-        mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry, mOperationReplicaCache);
+        mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mOperationReplicaCache);
         mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
 
         if (isActive)
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 7f8f704..909d228 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -173,7 +173,7 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
         ServiceLocatorParamsPtr discoveryParams = new ServiceLocatorParams();
         discoveryParams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
 
-        string replicatorName = ic->getProperties()->getPropertyWithDefault("RoutingStateReplicator.Name", "default");
+        string replicatorName = ic->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
         BasicRoutingStateReplicatorCompare* nameCompare = new BasicRoutingStateReplicatorCompare(replicatorName);
         ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(nameCompare));
 
@@ -207,7 +207,7 @@ void BasicRoutingStateReplicatorService::deregisterFromServiceLocator()
 
 void BasicRoutingStateReplicatorService::initialize(const std::string appName, const Ice::CommunicatorPtr& ic)
 {
-    mAdapter = ic->createObjectAdapter("BasicRoutingStateReplicator");
+    mAdapter = ic->createObjectAdapter(appName + ".RoutingReplicator");
 
     // setup logging client
     mIceLogger = createIceLogger(mAdapter);
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 3901dd0..c3f4903 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -92,8 +92,8 @@ public:
         setItems.push_back(item);
         mReplicationContext->getReplicatorService()->setState(setItems);
         
-        // Cache the keys of all pushed items.
-        mReplicatedStateKeys.push_back(item->key);
+        // Cache the replicated items.
+        mReplicatedState.push_back(item);
     }
 
     /**
@@ -148,7 +148,7 @@ public:
             {
                 // We just completed the entire operation. 
                 // Remove the items that represented this operation's state transitions from the state replicator.
-                mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
+                mReplicationContext->getReplicatorService()->removeStateForItems(mReplicatedState);
             }
         }
         catch(...)
@@ -168,7 +168,7 @@ public:
     }
 
 private:
-    Ice::StringSeq mReplicatedStateKeys;
+    RoutingStateItemSeq mReplicatedState;
     std::string mTransactionId;
     ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
     ReplicationContextPtr mReplicationContext;
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index fb3207c..7075940 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -135,14 +135,13 @@ public:
             try
             {
                 // Push this information to the state replicator.
-                RoutingStateItemSeq setItems;
-
-                EndpointLocatorRemovePtr removeEndpointItem(new EndpointLocatorRemove());
-                removeEndpointItem->key = locatorId;
+                RoutingStateItemSeq removeItems;
 
-                setItems.push_back(removeEndpointItem);
+                EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
+                addEndpointItem->key = locatorId;
+                removeItems.push_back(addEndpointItem);
 
-                mReplicationContext->getReplicatorService()->setState(setItems);
+                mReplicationContext->getReplicatorService()->removeStateForItems(removeItems);
             }
             catch(const Ice::Exception& e)
             {
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index e1c52b5..a79d5db 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -13,12 +13,15 @@
  * the GNU General Public License Version 2. See the LICENSE.txt file
  * at the top of the source tree.
  */
+#include <boost/thread/locks.hpp>
+
 #include <AsteriskSCF/Threading/WorkQueue.h>
 
 #include "OperationReplicaCache.h"
 #include "SessionRouterOperation.h"
 
 using namespace AsteriskSCF::Threading;
+using namespace AsteriskSCF::BasicRoutingService::V1;
 
 namespace AsteriskSCF
 {
@@ -35,8 +38,6 @@ typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationSta
  * When an item is called for from the cache, we'll apply all the available state updates we have, 
  * in order, up to the highest we have (without missing a state, since each state depends on the results 
  * of the previous.) 
- *
- * NOTE: TBD...This class needs some templatized functions to avoid all the duplication in the case statements. 
  */
 template<typename T>
 struct OperationReplicaItem
@@ -59,8 +60,12 @@ public:
     }
 
     SessionContextPtr mSessionContext;
+
     RouteSessionMapType routeSessionReplicas;
     ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
+
+    boost::shared_mutex mRouteSessionLock;
+    boost::shared_mutex mConnectBridgedWithDestLock;
 };
 
 OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext) 
@@ -68,56 +73,82 @@ OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionCon
 {
 }
 
-
-void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+/**
+ * For a specific type of operation, cache the update item.
+ */
+template<typename M, typename O, typename OPtr>
+void cacheIt(M& map, const OperationStateItemPtr& item, const SessionContextPtr& context)
 {
-        switch(type)
-        {
-        case ROUTE_SESSION_OP:
-            {
-            // See if this transaction is in the cache.
-            RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(item->transactionId);
-            if (i == mPriv->routeSessionReplicas.end())
-            {
-                // Add an entry to the cache.
-                OperationReplicaItem<RouteSessionOperationPtr> replica;
-                mPriv->routeSessionReplicas[item->transactionId] = replica;
+    // See if this transaction is in the cache.
+    M::iterator i = map.find(item->transactionId);
+    if (i == map.end())
+    {
+        // Add an entry to the cache.
+        OperationReplicaItem<OPtr> replica;
+        map[item->transactionId] = replica;
 
-                i = mPriv->routeSessionReplicas.find(item->transactionId);
-            }
-            (*i).second.mItems[item->key] = item;
+        i = map.find(item->transactionId);
+    }
+    (*i).second.mItems[item->key] = item;
 
-            // If we haven't created the replica yet, do so now. 
-            if ((*i).second.mOperation.get() == 0)
-            {
-                RouteSessionOperationPtr work(RouteSessionOperation::createReplica(mPriv->mSessionContext));
-                (*i).second.mOperation = work;
-            }
+    // If we haven't created the replica yet, do so now. 
+    if ((*i).second.mOperation.get() == 0)
+    {
+        OPtr work(O::createReplica(context));
+        (*i).second.mOperation = work;
+    }
 
-            // Update the replicated object with newest state update. 
-            (*i).second.mOperation->reflectUpdate(item);
-            }
-            break;
+    // Update the replicated object with newest state update. 
+    (*i).second.mOperation->reflectUpdate(item);
+}
 
-        case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
-            break;
+void OperationReplicaCache::cacheOperation(OperationType type, const OperationStateItemPtr& item)
+{
+    switch(type)
+    {
+    case ROUTE_SESSION_OP:
+        {
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
 
-        case CONNECT_BRIDGED_SESSIONS_OP:
-            // Not replicating this type. 
-            break;
+        cacheIt<RouteSessionMapType, 
+                RouteSessionOperation, 
+                RouteSessionOperationPtr>(mPriv->routeSessionReplicas, 
+                                          item,
+                                          mPriv->mSessionContext);
         }
+            break;
+
+    case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+        {
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
+
+        cacheIt<ConnectBridgeWithDestMapType, 
+                ConnectBridgedSessionsWithDestinationOperation, 
+                ConnectBridgedSessionsWithDestinationOperationPtr>(mPriv->connectBridgedWithDestReplicas, 
+                                                                    item,
+                                                                    mPriv->mSessionContext);            }
+        break;
+
+    case CONNECT_BRIDGED_SESSIONS_OP:
+        // Not replicating this type. 
+        break;
+    }
 }
 
-bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
+/** 
+ * For a specific type of operation, fetch a cached entry if one is available.
+ */
+template<typename M, typename OPtr>
+bool fetchIt(M& map, const std::string& transactionId, OPtr& ref)
 {
-    ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
-    if (i == mPriv->connectBridgedWithDestReplicas.end())
+    M::iterator i = map.find(transactionId);
+    if (i == map.end())
     {
         return false;
     }
 
     ref = (*i).second.mOperation;
-    mPriv->connectBridgedWithDestReplicas.erase(i);
+    map.erase(i);
 
     ref->fastForwardReplica();
 
@@ -126,26 +157,29 @@ bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string tr
 
 bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
 {
-    RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
-    if (i == mPriv->routeSessionReplicas.end())
-    {
-        return false;
-    }
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
 
-    ref = (*i).second.mOperation;
-    mPriv->routeSessionReplicas.erase(i);
+    return fetchIt<RouteSessionMapType, 
+                   RouteSessionOperationPtr> (mPriv->routeSessionReplicas, transactionId, ref);
+}
 
-    ref->fastForwardReplica();
+bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
 
-    return true;
+    return fetchIt<ConnectBridgeWithDestMapType, 
+                   ConnectBridgedSessionsWithDestinationOperationPtr> (mPriv->connectBridgedWithDestReplicas, transactionId, ref);
 }
 
-void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
+void OperationReplicaCache::dropOperation(OperationType type, std::string transactionId)
 {
+
     switch(type)
     {
     case ROUTE_SESSION_OP:
         {
+            boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
+
             RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
             if (i != mPriv->routeSessionReplicas.end())
             {
@@ -156,6 +190,8 @@ void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::s
 
     case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
         {
+            boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
+
             ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
             if (i != mPriv->connectBridgedWithDestReplicas.end())
             {
@@ -171,19 +207,32 @@ void OperationReplicaCache::clearCache(OperationType type)
     switch(type)
     {
     case ROUTE_SESSION_OP:
+        {
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
         mPriv->routeSessionReplicas.clear();
+        }
         break;
 
     case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+        {
+        boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
         mPriv->connectBridgedWithDestReplicas.clear();
+        }
         break;
     }
 }
 
 void OperationReplicaCache::clearCache()
 {
+    {
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
     mPriv->routeSessionReplicas.clear();
+    }
+
+    {
+    boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
     mPriv->connectBridgedWithDestReplicas.clear();
+    }
 }
 
 } // end BasicRoutingService
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index cd6aa6a..0424881 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -49,7 +49,7 @@ public:
     void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
     bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
     bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
-    void dropRouteSessionOperation(OperationType type, std::string transactionId);
+    void dropOperation(OperationType type, std::string transactionId);
 
     void clearCache(OperationType type);
     void clearCache();
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 4c3dcc6..46ba0b6 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -92,8 +92,8 @@ public:
         setItems.push_back(item);
         mReplicationContext->getReplicatorService()->setState(setItems);
         
-        // Cache the keys of all pushed items.
-        mReplicatedStateKeys.push_back(item->key);
+        // Cache the replication state items.
+        mReplicatedState.push_back(item);
     }
 
     /**
@@ -150,7 +150,7 @@ public:
             {
                 // We just completed the entire operation. 
                 // Remove the items that represented this operation's state transitions.
-                mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
+                mReplicationContext->getReplicatorService()->removeStateForItems(mReplicatedState);
             }
         }
         catch(...)
@@ -169,7 +169,7 @@ public:
     }
 
 private:
-    Ice::StringSeq mReplicatedStateKeys;
+    RoutingStateItemSeq mReplicatedState;
     std::string mTransactionId;
     RouteSessionOperationPtr mOperation;
     ReplicationContextPtr mReplicationContext;
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index fdb2ac0..7232d85 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -17,72 +17,122 @@
 #include <IceUtil/UUID.h>
 
 #include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/logger.h>
 
 #include "RoutingStateReplicatorListener.h"
 #include "OperationReplicaCache.h"
 #include "RouteSessionOperation.h"
+#include "EndpointRegistry.h"
 
 using namespace AsteriskSCF::BasicRoutingService::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
 
 namespace AsteriskSCF
 {
 namespace BasicRoutingService
 {
-
-class RoutingStateReplicatorItem
-{
-public:
-    RoutingStateReplicatorItem() { }
-    ~RoutingStateReplicatorItem()
-    {
-
-    }
-
-private:
-
-};
-
 /** 
- * Hidden details of our RoutingStateReplicatorListener implementation. 
- * This object listens for updates from the state replicator when 
- * this service is in standby mode, and ensures that this instance of the
- * service is prepared to be activated. 
+ * Private details of our RoutingStateReplicatorListener implementation. 
  */
-struct RoutingStateReplicatorListenerImpl
+class RoutingStateReplicatorListenerPriv
 {
 public:
-   RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+   RoutingStateReplicatorListenerPriv(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
         : mId(IceUtil::generateUUID()),
           mEndpointRegistry(registry),
           mOperationReplicaCache(opCache)
    {
    }
 
-    void removeStateNoticeImpl(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+    void stateRemovedForItems(const RoutingStateItemSeq& items, const Ice::Current& current)
     {
+        // Method local visitor implementation for handling removal of state items. 
+	    class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
+	    {
+	    public:
+	        visitor(RoutingStateReplicatorListenerPriv *impl) : mImpl(impl)
+	        {
+	        }
+
+	    private:
+	        RoutingStateReplicatorListenerPriv *mImpl;
+
+            void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
+            {
+                // The operation cache keeps all the collected state for an operation under the transaction id. 
+                mImpl->mOperationReplicaCache->dropOperation(ROUTE_SESSION_OP, opState->transactionId);
+            }
+
+            void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
+            {
+                // Removing the Start state for this operation will clean it up. 
+            }
+
+            void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr& opState)
+            {
+                // Removing the Start state for this operation will clean it up. 
+            }
+
+            void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
+            {
+                mImpl->mOperationReplicaCache->dropOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState->transactionId);
+            }
+
+            void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
+            {
+                // Removing the Start state for this operation will clean it up. 
+            }
+
+            void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr& opState)
+            {
+                // Removing the Start state for this operation will clean it up. 
+            }
+
+            void visitEndpointLocatorAdd(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorAddPtr& item)
+            {
+                mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+            }
+
+            void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
+            {
+                // Nothing much can be done here without some type of rollback facility. 
+            }
+
+        }; // end method-local visitor def
+
         // Create the visitor. Smart pointer will cleanup when this method exits. 
-        /**
         AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitorPtr v = new visitor(this);
 
-        for (Ice::StringSeq::iterator s = itemKeys.begin(); s != itemKeys.end(); ++s)
+        for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
         {
-	        //
+	        (*item)->visit(v);
         }
-        **/
     }
 
-    void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
+    void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+    {
+        lg(Error) << "Routing Service does not use key-based removes for state replication.";
+    }
+
+    void stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
     {
+        // Method-local visitor class for handling new state. 
+
 	    class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
 	    {
 	    public:
-	        visitor(RoutingStateReplicatorListenerImpl *impl) : mImpl(impl)
+	        visitor(RoutingStateReplicatorListenerPriv *impl) : mImpl(impl)
 	        {
 	        }
 
 	    private:
-	        RoutingStateReplicatorListenerImpl *mImpl;
+	        RoutingStateReplicatorListenerPriv *mImpl;
 
             void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
             {
@@ -119,11 +169,6 @@ public:
                 mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, ::Ice::Current());
             }
 
-            void visitEndpointLocatorRemove(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorRemovePtr& item)
-            {
-                mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
-            }
-
             void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
             {
                 mImpl->mEndpointRegistry->setEndpointLocatorDestinationIds(item->key, item->regExList, ::Ice::Current());
@@ -145,27 +190,31 @@ public:
     boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
 };
 
-RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
-    : mImpl(new RoutingStateReplicatorListenerImpl(registry, opCache)) 
+RoutingStateReplicatorListenerImpl::RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+    : mImpl(new RoutingStateReplicatorListenerPriv(registry, opCache)) 
+{
+}
+
+RoutingStateReplicatorListenerImpl::~RoutingStateReplicatorListenerImpl()
 {
 }
 
-RoutingStateReplicatorListenerI::~RoutingStateReplicatorListenerI()
+void RoutingStateReplicatorListenerImpl::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
 {
-    delete mImpl;
+    mImpl->stateRemoved(itemKeys, current);
 }
 
-void RoutingStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+void RoutingStateReplicatorListenerImpl::stateRemovedForItems(const RoutingStateItemSeq& items, const Ice::Current& current)
 {
-    mImpl->removeStateNoticeImpl(itemKeys, current);
+    mImpl->stateRemovedForItems(items, current);
 }
 
-void RoutingStateReplicatorListenerI::stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
+void RoutingStateReplicatorListenerImpl::stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
 {
-    mImpl->setStateNoticeImpl(items, current);
+    mImpl->stateSet(items, current);
 }
 
-bool RoutingStateReplicatorListenerI::operator==(RoutingStateReplicatorListenerI &rhs)
+bool RoutingStateReplicatorListenerImpl::operator==(RoutingStateReplicatorListenerImpl &rhs)
 {
     return mImpl->mId == rhs.mImpl->mId;
 }
diff --git a/src/RoutingStateReplicatorListener.h b/src/RoutingStateReplicatorListener.h
index f663327..61b8ea2 100644
--- a/src/RoutingStateReplicatorListener.h
+++ b/src/RoutingStateReplicatorListener.h
@@ -17,32 +17,37 @@
 #pragma once
 
 #include <Ice/Ice.h>
+#include <boost/shared_ptr.hpp>
+
 #include <AsteriskSCF/StateReplicator.h>
+
 #include "BasicRoutingStateReplicationIf.h"
-#include "EndpointRegistry.h"
 #include "OperationReplicaCache.h"
 
 namespace AsteriskSCF
 {
 namespace BasicRoutingService
 {
+class RoutingStateReplicatorListenerPriv;
+
 /** 
  * Our RoutingStateReplicatorListener implementation. 
  * This object listens for updates from the state replicator when 
  * this service is in standby mode, and ensures that this instance of the
  * service has the latest dynamic state in case it is activated. 
  */
-class RoutingStateReplicatorListenerI : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
+class RoutingStateReplicatorListenerImpl : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
 {
 public:
-    RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
-    ~RoutingStateReplicatorListenerI();
+    RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
+    ~RoutingStateReplicatorListenerImpl();
     void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
+    void stateRemovedForItems(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
     void stateSet(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
-    bool operator==(RoutingStateReplicatorListenerI &rhs);
+    bool operator==(RoutingStateReplicatorListenerImpl &rhs);
 
 private:
-    struct RoutingStateReplicatorListenerImpl *mImpl;
+    boost::shared_ptr<RoutingStateReplicatorListenerPriv> mImpl;
 };
 
 };
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c2935e1..25119da 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -16,10 +16,14 @@ asterisk_scf_component_add_file(RoutingTest MockEndpointLocator.cpp)
 
 asterisk_scf_component_add_ice_libraries(RoutingTest IceStorm)
 asterisk_scf_component_add_boost_libraries(RoutingTest unit_test_framework)
+
 include_directories(${API_INCLUDE_DIR})
+include_directories(${utils_dir}/TestFixture/include)
+
 asterisk_scf_component_build_icebox(RoutingTest)
 target_link_libraries(RoutingTest asterisk-scf-api)
 
 if(integrated_build STREQUAL "true")
    icebox_add_test(RoutingTest ../config/routingtest-integ.config)
+   icebox_add_test(RoutingReplicaTest ../config/routingtest-replica.config)
 endif()
diff --git a/test/TestRouting.cpp b/test/TestRouting.cpp
index 7213d63..acb2f39 100644
--- a/test/TestRouting.cpp
+++ b/test/TestRouting.cpp
@@ -14,6 +14,7 @@
  * at the top of the source tree.
  */
 #define BOOST_TEST_MODULE BasicRoutingServiceTestSuite
+
 #define BOOST_TEST_NO_MAIN
 
 #include <boost/test/unit_test.hpp>
@@ -23,6 +24,7 @@
 #include <IceBox/IceBox.h>
 #include <IceUtil/UUID.h>
 
+#include <AsteriskSCF/IceBoxBoostTest.h>
 #include <AsteriskSCF/Core/Routing/RoutingIf.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
@@ -40,59 +42,6 @@ using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::RoutingTest;
 
-/* Cache the command line arguments so that Ice can be initialized within the global fixture. */
-struct ArgCacheType
-{
-public:
-    int argc;
-    char **argv;
-    Ice::PropertiesPtr inheritedProps;
-};
-static ArgCacheType mCachedArgs;
-
-/**
- * Test service, for loading into icebox
- */
-class RoutingServiceTest : public IceBox::Service
-{
-public:
-    void start(const std::string&, const Ice::CommunicatorPtr&, const Ice::StringSeq&);
-    void stop();
-};
-
-void RoutingServiceTest::start(std::string const &name,
-    Ice::CommunicatorPtr const &communicator,
-    Ice::StringSeq const &args)
-{
-    std::vector<char const *> argv;
-    argv.push_back(name.c_str());
-    for (Ice::StringSeq::const_iterator i = args.begin(); i != args.end(); ++i)
-    {
-        argv.push_back(i->c_str());
-    }
-    // null terminated list
-    argv.push_back((char const *) 0);
-
-    mCachedArgs.argc = argv.size() - 1;
-    mCachedArgs.argv = (char**)&argv[0];
-    mCachedArgs.inheritedProps = communicator->getProperties();
-
-    int r = ::boost::unit_test::unit_test_main(&init_unit_test, mCachedArgs.argc, mCachedArgs.argv);
-    exit(r);
-}
-
-void RoutingServiceTest::stop()
-{
-}
-
-extern "C"
-{
-ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(Ice::CommunicatorPtr communicator)
-{
-    return new RoutingServiceTest;
-}
-}
-
 /**
  * Instantiate our shared data.
  */
@@ -117,11 +66,8 @@ struct GlobalIceFixture
         int status = 0;
         try
         {
-            Ice::InitializationData initData;
-            initData.properties = mCachedArgs.inheritedProps;
-
             // Set up incoming adapter. This is where we'll publish our proxies.
-            SharedTestData::instance.communicatorIn = Ice::initialize(initData);
+            SharedTestData::instance.communicatorIn = IceBoxTestEnv.communicator;
 
             Ice::PropertiesPtr props = SharedTestData::instance.communicatorIn->getProperties();
             SharedTestData::instance.adapterIn = SharedTestData::instance.communicatorIn->createObjectAdapterWithEndpoints("TestRoutingAdapterIn", "default -p 10070");
@@ -139,18 +85,19 @@ struct GlobalIceFixture
 
             // Now set up outgoing adapter. This will be used for proxies we want to call out to the
             // the unit under test on.
+            Ice::InitializationData initData;
+            initData.properties = IceBoxTestEnv.communicator->getProperties();
             SharedTestData::instance.communicatorOut = Ice::initialize(initData);
             SharedTestData::instance.adapterOut = SharedTestData::instance.communicatorOut->createObjectAdapterWithEndpoints("TestRoutingAdapterOut", "default -p 10071");
 
             // Get ref to Routing Service so we can test it. Getting direct for now, but
             // need to test acquiring reference via ServiceLocator as well.
             Ice::PropertiesPtr communicatorProps = SharedTestData::instance.communicatorOut->getProperties();
-            string locatorProp = communicatorProps->getProperty("LocatorRegistry.Proxy");
             Ice::ObjectPrx locatorObj = SharedTestData::instance.communicatorOut->propertyToProxy("LocatorRegistry.Proxy");
             SharedTestData::instance.locatorRegistry = LocatorRegistryPrx::uncheckedCast(locatorObj);
 
             // Get the ServiceLocator and ServiceLocator manager
-
+            // <TBD>
 
             if (!SharedTestData::instance.locatorRegistry)
             {
@@ -253,11 +200,7 @@ public:
     {
         try
         {
-            BOOST_TEST_MESSAGE("PerTestFixture initializing...");
-
             SharedTestData::instance.locatorRegistry->addEndpointLocator("TestChannel", SharedTestData::instance.regExIds, SharedTestData::instance.endpointLocatorPrx);
-
-            BOOST_TEST_MESSAGE("PerTestFixture initialized.");
         }
         catch (...)
         {
@@ -269,12 +212,8 @@ public:
     {
         try
         {
-            BOOST_TEST_MESSAGE("PerTestFixture cleanup starting...");
-
             SharedTestData::instance.locatorRegistry->removeEndpointLocator("TestChannel");
             SharedTestData::instance.endpointLocator->perTestCleanup();
-
-            BOOST_TEST_MESSAGE("PerTestFixture cleanup complete.");
         }
         catch(const std::exception &e)
         {
@@ -311,7 +250,6 @@ BOOST_AUTO_TEST_CASE(AddAndRemoveEndpointLocator)
         BOOST_FAIL("Exception removing EndpointLocator.");
     }
 
-    BOOST_TEST_MESSAGE("Completed AddAndRemoveEndpointLocator test.");
 }
 
 /**
@@ -339,8 +277,6 @@ BOOST_AUTO_TEST_CASE(AddEndpointLocatorTwice)
     {
          BOOST_FAIL("Exception removing EndpointLocator.");
     }
-
-    BOOST_TEST_MESSAGE("Completed AddEndpointLocatorTwice test.");
 }
 
 /**
@@ -370,8 +306,6 @@ BOOST_FIXTURE_TEST_CASE(LookupOwnEndpoint, PerTestFixture)
         BOOST_FAIL("Unknown exception looking up our own endpoint.");
     }
 
-    BOOST_TEST_MESSAGE("Completed LookupOwnEndpoint test.");
-
 }
 
 /**
@@ -575,3 +509,59 @@ BOOST_FIXTURE_TEST_CASE(AttendedTransfer, PerTestFixture)
         BOOST_FAIL("Unknown exception in AttendedTransfer:");
     }
 }
+
+/**
+ * A fixture for replication testing.
+ * Provides setup/teardown for a specific set of tests.
+ */
+struct ReplicationFixture
+{
+public:
+    ReplicationFixture()
+    {
+        try
+        {
+        }
+        catch (...)
+        {
+        }
+    }
+
+    ~ReplicationFixture()
+    {
+        try
+        {
+        }
+        catch (...)
+        {
+        }
+    }
+};
+
+/**
+ * The beginnings of a test of replication...<TBD>
+ */
+BOOST_FIXTURE_TEST_CASE(ReplicateEndpointLocator, ReplicationFixture)
+{
+
+   try
+   {
+       // Ice::ObjectAdapterPtr adapter = SharedTestData::instance.communicatorIn->createObjectAdapter("IceBox.ServiceManager");
+
+       Ice::Identity identity = SharedTestData::instance.communicatorOut->stringToIdentity("IceBox/ServiceManager");
+       Ice::ObjectPrx serviceObj = SharedTestData::instance.adapterOut->createDirectProxy(identity);
+
+       IceBox::ServiceManagerPrx serviceManager = IceBox::ServiceManagerPrx::checkedCast(serviceObj);
+      
+       serviceManager->stopService("RoutingService");
+   }
+   catch(const Ice::Exception& e)
+   {
+       BOOST_FAIL(e.what());
+   }
+   catch(...)
+   {
+       BOOST_FAIL("Unable to stop primary routing service.");
+   }
+
+}

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/routing.git



More information about the asterisk-scf-commits mailing list