[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Apr 18 22:30:00 CDT 2011
branch "route_replica" has been updated
via b4937c1744de2fb71fc48a364609e14f63462cef (commit)
from 3565da61eef05819148debaa444cf4cef47601cd (commit)
Summary of changes:
config/routingtest-integ.config.in | 42 +++++--
local-slice/BasicRoutingStateReplicationIf.ice | 10 +-
src/BasicRoutingServiceApp.cpp | 12 +-
src/BasicRoutingStateReplicatorApp.cpp | 4 +-
...nectBridgedSessionsWithDestinationOperation.cpp | 8 +-
src/EndpointRegistry.cpp | 11 +-
src/OperationReplicaCache.cpp | 143 +++++++++++++-------
src/OperationReplicaCache.h | 2 +-
src/RouteSessionOperation.cpp | 8 +-
src/RoutingStateReplicatorListener.cpp | 135 +++++++++++++------
src/RoutingStateReplicatorListener.h | 17 ++-
test/CMakeLists.txt | 4 +
test/TestRouting.cpp | 134 +++++++++----------
13 files changed, 322 insertions(+), 208 deletions(-)
- Log -----------------------------------------------------------------
commit b4937c1744de2fb71fc48a364609e14f63462cef
Author: Ken Hunt <ken.hunt at digium.com>
Date: Mon Apr 18 22:29:21 2011 -0500
More updates.
diff --git a/config/routingtest-integ.config.in b/config/routingtest-integ.config.in
index 8365b39..1c64594 100644
--- a/config/routingtest-integ.config.in
+++ b/config/routingtest-integ.config.in
@@ -1,15 +1,24 @@
# This is a configuration file a single process test of the Routing Service.
+#Ice.Admin.Endpoints=tcp -p 10006
+#Ice.Admin.InstanceName=IceBox
+IceBox.InstanceName=IceBox
+IceBox.ServiceManager.Endpoints=tcp -p 10007
+
IceBox.InheritProperties=1
-IceBox.Service.ServiceDiscovery=${service_locator_bindir}/src at service_locator:create
-IceBox.Service.BasicRoutingService=../src at BasicRoutingService:create
+IceBox.Service.ServiceDiscovery=service_locator:create
+IceBox.Service.RoutingService=BasicRoutingService:create
+IceBox.Service.RoutingService2=BasicRoutingService:create
+
# Boost Test arguments can be passed here.
-# IceBox.Service.RoutingTest = RoutingTest:create --log_level=all
+#IceBox.Service.RoutingTest = RoutingTest:create --log_level=all
IceBox.Service.RoutingTest = RoutingTest:create
-IceBox.LoadOrder=ServiceDiscovery,BasicRoutingService,RoutingTest
+IceBox.Service.Replicator=BasicRoutingStateReplicator:create
+
+IceBox.LoadOrder=ServiceDiscovery,Replicator,RoutingService,RoutingService2,RoutingTest
# Where to find the Service Locator manager. We need the Service Locator in order to be able to plug in to the Asterisk SCF system Discovery mechanisms.
LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
@@ -26,12 +35,25 @@ BridgeManager.ServiceLocatorId=BridgeService
###########################################
# Routing Service properties
-BasicRoutingServiceAdapter.Endpoints=tcp -p 10050
-
-BasicRoutingServiceAdapter.ThreadPool.Size=4
-BasicRoutingServiceAdapter.ThreadPool.SizeMax=10
-BasicRoutingServiceAdapter.ThreadPool.SizeWarn=9
-
+RoutingService.Endpoints=tcp -p 10050
+RoutingService.ThreadPool.Size=4
+RoutingService.ThreadPool.SizeMax=10
+RoutingService.ThreadPool.SizeWarn=9
+RoutingService.Standby=no
+RoutingService.StateReplicatorName=Replicator
+
+RoutingService2.Endpoints=tcp -p 10051
+RoutingService2.ThreadPool.Size=4
+RoutingService2.ThreadPool.SizeMax=10
+RoutingService2.ThreadPool.SizeWarn=9
+RoutingService2.Standby=yes
+RoutingService2.StateReplicatorName=Replicator
+
+Replicator.InstanceName=Replicator
+Replicator.RoutingReplicator.Endpoints=default -p 10052
+Replicator.RoutingReplicator.ThreadPool.Size=4
+
+# AsteriskSCF.RoutingService.logger=Debug
###########################################
# Test properties
diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 676538d..01b1579 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -62,6 +62,7 @@ module V1
interface RoutingStateReplicatorListener
{
void stateRemoved(Ice::StringSeq itemKeys);
+ void stateRemovedForItems(RoutingStateItemSeq items);
void stateSet(RoutingStateItemSeq items);
};
@@ -74,6 +75,7 @@ module V1
void removeListener(RoutingStateReplicatorListener *listener);
void setState (RoutingStateItemSeq items);
void removeState(Ice::StringSeq items);
+ void removeStateForItems(RoutingStateItemSeq items);
idempotent RoutingStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RoutingStateItemSeq getAllState();
};
@@ -181,14 +183,6 @@ module V1
};
/**
- * Represents a removed endpoint locator.
- * The key (in the base state item) is the locatorId.
- */
- class EndpointLocatorRemove extends RoutingStateItem
- {
- };
-
- /**
* Represents a change of the endpoint locators
* managed ids.
* The key (in the base state item) is the locatorId.
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 348d871..07031ae 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -303,7 +303,9 @@ void BasicRoutingServiceApp::listenToStateReplicator()
}
/**
- *
+ * Unregister as a listener to our state replicator.
+ * A component in active mode doesn't neeed to listen to
+ * state replication data.
*/
void BasicRoutingServiceApp::stopListeningToStateReplicator()
{
@@ -434,7 +436,7 @@ void BasicRoutingServiceApp::locateStateReplicator(bool isActive)
{
BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
- replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault("Sip.StateReplicatorName", "default");
+ replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".StateReplicatorName", "default");
AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> replicator(mServiceLocator, replicatorParams, lg);
@@ -450,9 +452,9 @@ void BasicRoutingServiceApp::initialize()
try
{
// Create the adapter.
- mAdapter = mCommunicator->createObjectAdapter("BasicRoutingServiceAdapter");
+ mAdapter = mCommunicator->createObjectAdapter(mAppName);
- bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault("BasicRoutingService.Standby", "no") == "yes");
+ bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".Standby", "no") == "yes");
// Create the replication context.
ReplicationContextPtr replicationContext(new ReplicationContext(isActive));
@@ -509,7 +511,7 @@ void BasicRoutingServiceApp::initialize()
mAdapter->add(mReplicaManagement, mCommunicator->stringToIdentity(ReplicaServiceId));
// Create and publish our state replicator listener interface.
- mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry, mOperationReplicaCache);
+ mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mOperationReplicaCache);
mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
if (isActive)
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 7f8f704..909d228 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -173,7 +173,7 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
ServiceLocatorParamsPtr discoveryParams = new ServiceLocatorParams();
discoveryParams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
- string replicatorName = ic->getProperties()->getPropertyWithDefault("RoutingStateReplicator.Name", "default");
+ string replicatorName = ic->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
BasicRoutingStateReplicatorCompare* nameCompare = new BasicRoutingStateReplicatorCompare(replicatorName);
ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(nameCompare));
@@ -207,7 +207,7 @@ void BasicRoutingStateReplicatorService::deregisterFromServiceLocator()
void BasicRoutingStateReplicatorService::initialize(const std::string appName, const Ice::CommunicatorPtr& ic)
{
- mAdapter = ic->createObjectAdapter("BasicRoutingStateReplicator");
+ mAdapter = ic->createObjectAdapter(appName + ".RoutingReplicator");
// setup logging client
mIceLogger = createIceLogger(mAdapter);
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 3901dd0..c3f4903 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -92,8 +92,8 @@ public:
setItems.push_back(item);
mReplicationContext->getReplicatorService()->setState(setItems);
- // Cache the keys of all pushed items.
- mReplicatedStateKeys.push_back(item->key);
+ // Cache the replicated items.
+ mReplicatedState.push_back(item);
}
/**
@@ -148,7 +148,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions from the state replicator.
- mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
+ mReplicationContext->getReplicatorService()->removeStateForItems(mReplicatedState);
}
}
catch(...)
@@ -168,7 +168,7 @@ public:
}
private:
- Ice::StringSeq mReplicatedStateKeys;
+ RoutingStateItemSeq mReplicatedState;
std::string mTransactionId;
ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
ReplicationContextPtr mReplicationContext;
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index fb3207c..7075940 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -135,14 +135,13 @@ public:
try
{
// Push this information to the state replicator.
- RoutingStateItemSeq setItems;
-
- EndpointLocatorRemovePtr removeEndpointItem(new EndpointLocatorRemove());
- removeEndpointItem->key = locatorId;
+ RoutingStateItemSeq removeItems;
- setItems.push_back(removeEndpointItem);
+ EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
+ addEndpointItem->key = locatorId;
+ removeItems.push_back(addEndpointItem);
- mReplicationContext->getReplicatorService()->setState(setItems);
+ mReplicationContext->getReplicatorService()->removeStateForItems(removeItems);
}
catch(const Ice::Exception& e)
{
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index e1c52b5..a79d5db 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -13,12 +13,15 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
+#include <boost/thread/locks.hpp>
+
#include <AsteriskSCF/Threading/WorkQueue.h>
#include "OperationReplicaCache.h"
#include "SessionRouterOperation.h"
using namespace AsteriskSCF::Threading;
+using namespace AsteriskSCF::BasicRoutingService::V1;
namespace AsteriskSCF
{
@@ -35,8 +38,6 @@ typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationSta
* When an item is called for from the cache, we'll apply all the available state updates we have,
* in order, up to the highest we have (without missing a state, since each state depends on the results
* of the previous.)
- *
- * NOTE: TBD...This class needs some templatized functions to avoid all the duplication in the case statements.
*/
template<typename T>
struct OperationReplicaItem
@@ -59,8 +60,12 @@ public:
}
SessionContextPtr mSessionContext;
+
RouteSessionMapType routeSessionReplicas;
ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
+
+ boost::shared_mutex mRouteSessionLock;
+ boost::shared_mutex mConnectBridgedWithDestLock;
};
OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext)
@@ -68,56 +73,82 @@ OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionCon
{
}
-
-void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+/**
+ * For a specific type of operation, cache the update item.
+ */
+template<typename M, typename O, typename OPtr>
+void cacheIt(M& map, const OperationStateItemPtr& item, const SessionContextPtr& context)
{
- switch(type)
- {
- case ROUTE_SESSION_OP:
- {
- // See if this transaction is in the cache.
- RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(item->transactionId);
- if (i == mPriv->routeSessionReplicas.end())
- {
- // Add an entry to the cache.
- OperationReplicaItem<RouteSessionOperationPtr> replica;
- mPriv->routeSessionReplicas[item->transactionId] = replica;
+ // See if this transaction is in the cache.
+ M::iterator i = map.find(item->transactionId);
+ if (i == map.end())
+ {
+ // Add an entry to the cache.
+ OperationReplicaItem<OPtr> replica;
+ map[item->transactionId] = replica;
- i = mPriv->routeSessionReplicas.find(item->transactionId);
- }
- (*i).second.mItems[item->key] = item;
+ i = map.find(item->transactionId);
+ }
+ (*i).second.mItems[item->key] = item;
- // If we haven't created the replica yet, do so now.
- if ((*i).second.mOperation.get() == 0)
- {
- RouteSessionOperationPtr work(RouteSessionOperation::createReplica(mPriv->mSessionContext));
- (*i).second.mOperation = work;
- }
+ // If we haven't created the replica yet, do so now.
+ if ((*i).second.mOperation.get() == 0)
+ {
+ OPtr work(O::createReplica(context));
+ (*i).second.mOperation = work;
+ }
- // Update the replicated object with newest state update.
- (*i).second.mOperation->reflectUpdate(item);
- }
- break;
+ // Update the replicated object with newest state update.
+ (*i).second.mOperation->reflectUpdate(item);
+}
- case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
- break;
+void OperationReplicaCache::cacheOperation(OperationType type, const OperationStateItemPtr& item)
+{
+ switch(type)
+ {
+ case ROUTE_SESSION_OP:
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
- case CONNECT_BRIDGED_SESSIONS_OP:
- // Not replicating this type.
- break;
+ cacheIt<RouteSessionMapType,
+ RouteSessionOperation,
+ RouteSessionOperationPtr>(mPriv->routeSessionReplicas,
+ item,
+ mPriv->mSessionContext);
}
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
+
+ cacheIt<ConnectBridgeWithDestMapType,
+ ConnectBridgedSessionsWithDestinationOperation,
+ ConnectBridgedSessionsWithDestinationOperationPtr>(mPriv->connectBridgedWithDestReplicas,
+ item,
+ mPriv->mSessionContext); }
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_OP:
+ // Not replicating this type.
+ break;
+ }
}
-bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
+/**
+ * For a specific type of operation, fetch a cached entry if one is available.
+ */
+template<typename M, typename OPtr>
+bool fetchIt(M& map, const std::string& transactionId, OPtr& ref)
{
- ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
- if (i == mPriv->connectBridgedWithDestReplicas.end())
+ M::iterator i = map.find(transactionId);
+ if (i == map.end())
{
return false;
}
ref = (*i).second.mOperation;
- mPriv->connectBridgedWithDestReplicas.erase(i);
+ map.erase(i);
ref->fastForwardReplica();
@@ -126,26 +157,29 @@ bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string tr
bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
{
- RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
- if (i == mPriv->routeSessionReplicas.end())
- {
- return false;
- }
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
- ref = (*i).second.mOperation;
- mPriv->routeSessionReplicas.erase(i);
+ return fetchIt<RouteSessionMapType,
+ RouteSessionOperationPtr> (mPriv->routeSessionReplicas, transactionId, ref);
+}
- ref->fastForwardReplica();
+bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
- return true;
+ return fetchIt<ConnectBridgeWithDestMapType,
+ ConnectBridgedSessionsWithDestinationOperationPtr> (mPriv->connectBridgedWithDestReplicas, transactionId, ref);
}
-void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
+void OperationReplicaCache::dropOperation(OperationType type, std::string transactionId)
{
+
switch(type)
{
case ROUTE_SESSION_OP:
{
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
+
RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
if (i != mPriv->routeSessionReplicas.end())
{
@@ -156,6 +190,8 @@ void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::s
case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
{
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
+
ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
if (i != mPriv->connectBridgedWithDestReplicas.end())
{
@@ -171,19 +207,32 @@ void OperationReplicaCache::clearCache(OperationType type)
switch(type)
{
case ROUTE_SESSION_OP:
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
mPriv->routeSessionReplicas.clear();
+ }
break;
case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
mPriv->connectBridgedWithDestReplicas.clear();
+ }
break;
}
}
void OperationReplicaCache::clearCache()
{
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
mPriv->routeSessionReplicas.clear();
+ }
+
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
mPriv->connectBridgedWithDestReplicas.clear();
+ }
}
} // end BasicRoutingService
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index cd6aa6a..0424881 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -49,7 +49,7 @@ public:
void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
- void dropRouteSessionOperation(OperationType type, std::string transactionId);
+ void dropOperation(OperationType type, std::string transactionId);
void clearCache(OperationType type);
void clearCache();
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 4c3dcc6..46ba0b6 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -92,8 +92,8 @@ public:
setItems.push_back(item);
mReplicationContext->getReplicatorService()->setState(setItems);
- // Cache the keys of all pushed items.
- mReplicatedStateKeys.push_back(item->key);
+ // Cache the replication state items.
+ mReplicatedState.push_back(item);
}
/**
@@ -150,7 +150,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions.
- mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
+ mReplicationContext->getReplicatorService()->removeStateForItems(mReplicatedState);
}
}
catch(...)
@@ -169,7 +169,7 @@ public:
}
private:
- Ice::StringSeq mReplicatedStateKeys;
+ RoutingStateItemSeq mReplicatedState;
std::string mTransactionId;
RouteSessionOperationPtr mOperation;
ReplicationContextPtr mReplicationContext;
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index fdb2ac0..7232d85 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -17,72 +17,122 @@
#include <IceUtil/UUID.h>
#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/logger.h>
#include "RoutingStateReplicatorListener.h"
#include "OperationReplicaCache.h"
#include "RouteSessionOperation.h"
+#include "EndpointRegistry.h"
using namespace AsteriskSCF::BasicRoutingService::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
namespace AsteriskSCF
{
namespace BasicRoutingService
{
-
-class RoutingStateReplicatorItem
-{
-public:
- RoutingStateReplicatorItem() { }
- ~RoutingStateReplicatorItem()
- {
-
- }
-
-private:
-
-};
-
/**
- * Hidden details of our RoutingStateReplicatorListener implementation.
- * This object listens for updates from the state replicator when
- * this service is in standby mode, and ensures that this instance of the
- * service is prepared to be activated.
+ * Private details of our RoutingStateReplicatorListener implementation.
*/
-struct RoutingStateReplicatorListenerImpl
+class RoutingStateReplicatorListenerPriv
{
public:
- RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+ RoutingStateReplicatorListenerPriv(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
: mId(IceUtil::generateUUID()),
mEndpointRegistry(registry),
mOperationReplicaCache(opCache)
{
}
- void removeStateNoticeImpl(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+ void stateRemovedForItems(const RoutingStateItemSeq& items, const Ice::Current& current)
{
+ // Method local visitor implementation for handling removal of state items.
+ class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
+ {
+ public:
+ visitor(RoutingStateReplicatorListenerPriv *impl) : mImpl(impl)
+ {
+ }
+
+ private:
+ RoutingStateReplicatorListenerPriv *mImpl;
+
+ void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
+ {
+ // The operation cache keeps all the collected state for an operation under the transaction id.
+ mImpl->mOperationReplicaCache->dropOperation(ROUTE_SESSION_OP, opState->transactionId);
+ }
+
+ void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
+ {
+ // Removing the Start state for this operation will clean it up.
+ }
+
+ void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr& opState)
+ {
+ // Removing the Start state for this operation will clean it up.
+ }
+
+ void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
+ {
+ mImpl->mOperationReplicaCache->dropOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState->transactionId);
+ }
+
+ void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
+ {
+ // Removing the Start state for this operation will clean it up.
+ }
+
+ void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr& opState)
+ {
+ // Removing the Start state for this operation will clean it up.
+ }
+
+ void visitEndpointLocatorAdd(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorAddPtr& item)
+ {
+ mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+ }
+
+ void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
+ {
+ // Nothing much can be done here without some type of rollback facility.
+ }
+
+ }; // end method-local visitor def
+
// Create the visitor. Smart pointer will cleanup when this method exits.
- /**
AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitorPtr v = new visitor(this);
- for (Ice::StringSeq::iterator s = itemKeys.begin(); s != itemKeys.end(); ++s)
+ for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
{
- //
+ (*item)->visit(v);
}
- **/
}
- void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
+ void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+ {
+ lg(Error) << "Routing Service does not use key-based removes for state replication.";
+ }
+
+ void stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
{
+ // Method-local visitor class for handling new state.
+
class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
{
public:
- visitor(RoutingStateReplicatorListenerImpl *impl) : mImpl(impl)
+ visitor(RoutingStateReplicatorListenerPriv *impl) : mImpl(impl)
{
}
private:
- RoutingStateReplicatorListenerImpl *mImpl;
+ RoutingStateReplicatorListenerPriv *mImpl;
void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
{
@@ -119,11 +169,6 @@ public:
mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, ::Ice::Current());
}
- void visitEndpointLocatorRemove(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorRemovePtr& item)
- {
- mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
- }
-
void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
{
mImpl->mEndpointRegistry->setEndpointLocatorDestinationIds(item->key, item->regExList, ::Ice::Current());
@@ -145,27 +190,31 @@ public:
boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
};
-RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
- : mImpl(new RoutingStateReplicatorListenerImpl(registry, opCache))
+RoutingStateReplicatorListenerImpl::RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+ : mImpl(new RoutingStateReplicatorListenerPriv(registry, opCache))
+{
+}
+
+RoutingStateReplicatorListenerImpl::~RoutingStateReplicatorListenerImpl()
{
}
-RoutingStateReplicatorListenerI::~RoutingStateReplicatorListenerI()
+void RoutingStateReplicatorListenerImpl::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
{
- delete mImpl;
+ mImpl->stateRemoved(itemKeys, current);
}
-void RoutingStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
+void RoutingStateReplicatorListenerImpl::stateRemovedForItems(const RoutingStateItemSeq& items, const Ice::Current& current)
{
- mImpl->removeStateNoticeImpl(itemKeys, current);
+ mImpl->stateRemovedForItems(items, current);
}
-void RoutingStateReplicatorListenerI::stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
+void RoutingStateReplicatorListenerImpl::stateSet(const RoutingStateItemSeq& items, const Ice::Current& current)
{
- mImpl->setStateNoticeImpl(items, current);
+ mImpl->stateSet(items, current);
}
-bool RoutingStateReplicatorListenerI::operator==(RoutingStateReplicatorListenerI &rhs)
+bool RoutingStateReplicatorListenerImpl::operator==(RoutingStateReplicatorListenerImpl &rhs)
{
return mImpl->mId == rhs.mImpl->mId;
}
diff --git a/src/RoutingStateReplicatorListener.h b/src/RoutingStateReplicatorListener.h
index f663327..61b8ea2 100644
--- a/src/RoutingStateReplicatorListener.h
+++ b/src/RoutingStateReplicatorListener.h
@@ -17,32 +17,37 @@
#pragma once
#include <Ice/Ice.h>
+#include <boost/shared_ptr.hpp>
+
#include <AsteriskSCF/StateReplicator.h>
+
#include "BasicRoutingStateReplicationIf.h"
-#include "EndpointRegistry.h"
#include "OperationReplicaCache.h"
namespace AsteriskSCF
{
namespace BasicRoutingService
{
+class RoutingStateReplicatorListenerPriv;
+
/**
* Our RoutingStateReplicatorListener implementation.
* This object listens for updates from the state replicator when
* this service is in standby mode, and ensures that this instance of the
* service has the latest dynamic state in case it is activated.
*/
-class RoutingStateReplicatorListenerI : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
+class RoutingStateReplicatorListenerImpl : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
{
public:
- RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
- ~RoutingStateReplicatorListenerI();
+ RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
+ ~RoutingStateReplicatorListenerImpl();
void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
+ void stateRemovedForItems(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
void stateSet(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
- bool operator==(RoutingStateReplicatorListenerI &rhs);
+ bool operator==(RoutingStateReplicatorListenerImpl &rhs);
private:
- struct RoutingStateReplicatorListenerImpl *mImpl;
+ boost::shared_ptr<RoutingStateReplicatorListenerPriv> mImpl;
};
};
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c2935e1..25119da 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -16,10 +16,14 @@ asterisk_scf_component_add_file(RoutingTest MockEndpointLocator.cpp)
asterisk_scf_component_add_ice_libraries(RoutingTest IceStorm)
asterisk_scf_component_add_boost_libraries(RoutingTest unit_test_framework)
+
include_directories(${API_INCLUDE_DIR})
+include_directories(${utils_dir}/TestFixture/include)
+
asterisk_scf_component_build_icebox(RoutingTest)
target_link_libraries(RoutingTest asterisk-scf-api)
if(integrated_build STREQUAL "true")
icebox_add_test(RoutingTest ../config/routingtest-integ.config)
+ icebox_add_test(RoutingReplicaTest ../config/routingtest-replica.config)
endif()
diff --git a/test/TestRouting.cpp b/test/TestRouting.cpp
index 7213d63..acb2f39 100644
--- a/test/TestRouting.cpp
+++ b/test/TestRouting.cpp
@@ -14,6 +14,7 @@
* at the top of the source tree.
*/
#define BOOST_TEST_MODULE BasicRoutingServiceTestSuite
+
#define BOOST_TEST_NO_MAIN
#include <boost/test/unit_test.hpp>
@@ -23,6 +24,7 @@
#include <IceBox/IceBox.h>
#include <IceUtil/UUID.h>
+#include <AsteriskSCF/IceBoxBoostTest.h>
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
@@ -40,59 +42,6 @@ using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::RoutingTest;
-/* Cache the command line arguments so that Ice can be initialized within the global fixture. */
-struct ArgCacheType
-{
-public:
- int argc;
- char **argv;
- Ice::PropertiesPtr inheritedProps;
-};
-static ArgCacheType mCachedArgs;
-
-/**
- * Test service, for loading into icebox
- */
-class RoutingServiceTest : public IceBox::Service
-{
-public:
- void start(const std::string&, const Ice::CommunicatorPtr&, const Ice::StringSeq&);
- void stop();
-};
-
-void RoutingServiceTest::start(std::string const &name,
- Ice::CommunicatorPtr const &communicator,
- Ice::StringSeq const &args)
-{
- std::vector<char const *> argv;
- argv.push_back(name.c_str());
- for (Ice::StringSeq::const_iterator i = args.begin(); i != args.end(); ++i)
- {
- argv.push_back(i->c_str());
- }
- // null terminated list
- argv.push_back((char const *) 0);
-
- mCachedArgs.argc = argv.size() - 1;
- mCachedArgs.argv = (char**)&argv[0];
- mCachedArgs.inheritedProps = communicator->getProperties();
-
- int r = ::boost::unit_test::unit_test_main(&init_unit_test, mCachedArgs.argc, mCachedArgs.argv);
- exit(r);
-}
-
-void RoutingServiceTest::stop()
-{
-}
-
-extern "C"
-{
-ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(Ice::CommunicatorPtr communicator)
-{
- return new RoutingServiceTest;
-}
-}
-
/**
* Instantiate our shared data.
*/
@@ -117,11 +66,8 @@ struct GlobalIceFixture
int status = 0;
try
{
- Ice::InitializationData initData;
- initData.properties = mCachedArgs.inheritedProps;
-
// Set up incoming adapter. This is where we'll publish our proxies.
- SharedTestData::instance.communicatorIn = Ice::initialize(initData);
+ SharedTestData::instance.communicatorIn = IceBoxTestEnv.communicator;
Ice::PropertiesPtr props = SharedTestData::instance.communicatorIn->getProperties();
SharedTestData::instance.adapterIn = SharedTestData::instance.communicatorIn->createObjectAdapterWithEndpoints("TestRoutingAdapterIn", "default -p 10070");
@@ -139,18 +85,19 @@ struct GlobalIceFixture
// Now set up outgoing adapter. This will be used for proxies we want to call out to the
// the unit under test on.
+ Ice::InitializationData initData;
+ initData.properties = IceBoxTestEnv.communicator->getProperties();
SharedTestData::instance.communicatorOut = Ice::initialize(initData);
SharedTestData::instance.adapterOut = SharedTestData::instance.communicatorOut->createObjectAdapterWithEndpoints("TestRoutingAdapterOut", "default -p 10071");
// Get ref to Routing Service so we can test it. Getting direct for now, but
// need to test acquiring reference via ServiceLocator as well.
Ice::PropertiesPtr communicatorProps = SharedTestData::instance.communicatorOut->getProperties();
- string locatorProp = communicatorProps->getProperty("LocatorRegistry.Proxy");
Ice::ObjectPrx locatorObj = SharedTestData::instance.communicatorOut->propertyToProxy("LocatorRegistry.Proxy");
SharedTestData::instance.locatorRegistry = LocatorRegistryPrx::uncheckedCast(locatorObj);
// Get the ServiceLocator and ServiceLocator manager
-
+ // <TBD>
if (!SharedTestData::instance.locatorRegistry)
{
@@ -253,11 +200,7 @@ public:
{
try
{
- BOOST_TEST_MESSAGE("PerTestFixture initializing...");
-
SharedTestData::instance.locatorRegistry->addEndpointLocator("TestChannel", SharedTestData::instance.regExIds, SharedTestData::instance.endpointLocatorPrx);
-
- BOOST_TEST_MESSAGE("PerTestFixture initialized.");
}
catch (...)
{
@@ -269,12 +212,8 @@ public:
{
try
{
- BOOST_TEST_MESSAGE("PerTestFixture cleanup starting...");
-
SharedTestData::instance.locatorRegistry->removeEndpointLocator("TestChannel");
SharedTestData::instance.endpointLocator->perTestCleanup();
-
- BOOST_TEST_MESSAGE("PerTestFixture cleanup complete.");
}
catch(const std::exception &e)
{
@@ -311,7 +250,6 @@ BOOST_AUTO_TEST_CASE(AddAndRemoveEndpointLocator)
BOOST_FAIL("Exception removing EndpointLocator.");
}
- BOOST_TEST_MESSAGE("Completed AddAndRemoveEndpointLocator test.");
}
/**
@@ -339,8 +277,6 @@ BOOST_AUTO_TEST_CASE(AddEndpointLocatorTwice)
{
BOOST_FAIL("Exception removing EndpointLocator.");
}
-
- BOOST_TEST_MESSAGE("Completed AddEndpointLocatorTwice test.");
}
/**
@@ -370,8 +306,6 @@ BOOST_FIXTURE_TEST_CASE(LookupOwnEndpoint, PerTestFixture)
BOOST_FAIL("Unknown exception looking up our own endpoint.");
}
- BOOST_TEST_MESSAGE("Completed LookupOwnEndpoint test.");
-
}
/**
@@ -575,3 +509,59 @@ BOOST_FIXTURE_TEST_CASE(AttendedTransfer, PerTestFixture)
BOOST_FAIL("Unknown exception in AttendedTransfer:");
}
}
+
+/**
+ * A fixture for replication testing.
+ * Provides setup/teardown for a specific set of tests.
+ */
+struct ReplicationFixture
+{
+public:
+ ReplicationFixture()
+ {
+ try
+ {
+ }
+ catch (...)
+ {
+ }
+ }
+
+ ~ReplicationFixture()
+ {
+ try
+ {
+ }
+ catch (...)
+ {
+ }
+ }
+};
+
+/**
+ * The beginnings of a test of replication...<TBD>
+ */
+BOOST_FIXTURE_TEST_CASE(ReplicateEndpointLocator, ReplicationFixture)
+{
+
+ try
+ {
+ // Ice::ObjectAdapterPtr adapter = SharedTestData::instance.communicatorIn->createObjectAdapter("IceBox.ServiceManager");
+
+ Ice::Identity identity = SharedTestData::instance.communicatorOut->stringToIdentity("IceBox/ServiceManager");
+ Ice::ObjectPrx serviceObj = SharedTestData::instance.adapterOut->createDirectProxy(identity);
+
+ IceBox::ServiceManagerPrx serviceManager = IceBox::ServiceManagerPrx::checkedCast(serviceObj);
+
+ serviceManager->stopService("RoutingService");
+ }
+ catch(const Ice::Exception& e)
+ {
+ BOOST_FAIL(e.what());
+ }
+ catch(...)
+ {
+ BOOST_FAIL("Unable to stop primary routing service.");
+ }
+
+}
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list