[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Apr 15 18:58:52 CDT 2011
branch "route_replica" has been updated
via 911fe40828872c429d3b56a9f713f72cb70d6783 (commit)
from c0da173bc8199919b8da1a1f79be484f7c894098 (commit)
Summary of changes:
src/BasicRoutingServiceApp.cpp | 114 ++++++++++----------
src/BasicRoutingStateReplicatorApp.cpp | 17 +++-
src/CMakeLists.txt | 7 +-
...nectBridgedSessionsWithDestinationOperation.cpp | 28 ++++--
src/EndpointRegistry.cpp | 57 +++++------
src/EndpointRegistry.h | 8 +-
src/ReplicationContext.cpp | 90 +++++++++++++++
src/ReplicationContext.h | 69 ++++++++++++
src/RouteSessionOperation.cpp | 28 ++++--
src/RoutingServiceEventPublisher.cpp | 36 ++-----
src/RoutingServiceEventPublisher.h | 4 +-
...ener.cpp => RoutingStateReplicatorListener.cpp} | 31 ++++--
...plicator.h => RoutingStateReplicatorListener.h} | 6 -
src/SessionRouterOperation.h | 10 ++-
test/TestRouting.cpp | 7 +-
15 files changed, 350 insertions(+), 162 deletions(-)
create mode 100644 src/ReplicationContext.cpp
create mode 100644 src/ReplicationContext.h
rename src/{BasicRoutingStateReplicatorListener.cpp => RoutingStateReplicatorListener.cpp} (76%)
rename src/{BasicRoutingStateReplicator.h => RoutingStateReplicatorListener.h} (75%)
- Log -----------------------------------------------------------------
commit 911fe40828872c429d3b56a9f713f72cb70d6783
Author: Ken Hunt <ken.hunt at digium.com>
Date: Fri Apr 15 18:57:45 2011 -0500
Cleanup of the replication context for the entire component.
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 4b9d3c4..348d871 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,13 +30,14 @@
#include <AsteriskSCF/logger.h>
#include "AsteriskSCF/Threading/SimpleWorkQueue.h"
-#include "BasicRoutingStateReplicator.h"
+#include "RoutingStateReplicatorListener.h"
#include "LuaScriptProcessor.h"
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
#include "OperationReplicaCache.h"
+#include "ReplicationContext.h"
using namespace std;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -67,7 +68,8 @@ public:
mInitialized(false),
mRunning(false),
mSessionContext(new SessionContext()),
- mWorkQueue( new AsteriskSCF::Threading::SimpleWorkQueue("SessionRouterWorkQueue", lg))
+ mWorkQueue( new AsteriskSCF::Threading::SimpleWorkQueue("SessionRouterWorkQueue", lg)),
+ mListeningToReplicator(false)
{
}
@@ -80,6 +82,7 @@ public:
void activated();
void onStandby();
+ bool isActive();
public: // Overrides of IceBox::Service
virtual void start(const string& name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
@@ -88,13 +91,13 @@ public: // Overrides of IceBox::Service
private:
void initialize();
void locateBridgeManager();
- void locateStateReplicator();
+ void locateStateReplicator(bool isActive);
void registerWithServiceLocator();
void deregisterFromServiceLocator();
void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category);
+
void listenToStateReplicator();
void stopListeningToStateReplicator();
- void pushStateReplicator();
bool mDone;
bool mInitialized;
@@ -123,10 +126,13 @@ private:
EndpointRegistryPtr mEndpointRegistry;
// Replication support
- ReplicaPtr mReplicaService;
+ ReplicationContextPtr mReplicationContext;
+ ReplicaPtr mReplicaManagement;
AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
RoutingStateReplicatorListenerPtr mReplicatorListener;
+
RoutingStateReplicatorListenerPrx mReplicatorListenerProxy;
+ bool mListeningToReplicator;
// Implementation
Ice::ObjectAdapterPtr mAdapter;
@@ -193,43 +199,27 @@ private:
};
/**
- * This class provides implementation for the Replica interface.
- * It also tracks the active/standby state of this component.
+ * This class provides implementation for this component's Replica management interface.
*/
-class ReplicaImpl : public Replica
+class ReplicaManagement : public Replica
{
public:
/**
* Constructor.
* @param app
* @param adapter The adapter is assumed to have been activated.
- * @param active
*/
- ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mActive(active)
+ ReplicaManagement(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter) : mApp(app), mAdapter(adapter)
{
- if (mActive)
- {
- activate();
- }
- else
- {
- standby();
- }
}
bool isActive(const Ice::Current&)
{
- return mActive;
- }
-
- bool isActive()
- {
- return mActive;
+ return mApp.isActive();
}
bool activate(const Ice::Current& = ::Ice::Current())
{
- mActive = true;
mApp.activated();
for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
@@ -242,7 +232,6 @@ public:
void standby(const Ice::Current& = ::Ice::Current())
{
- mActive = false;
mApp.onStandby();
for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
@@ -272,42 +261,44 @@ private:
*/
vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
- bool mActive;
-
BasicRoutingServiceApp& mApp;
};
+bool BasicRoutingServiceApp::isActive()
+{
+ return mReplicationContext->isComponentActive();
+}
+
void BasicRoutingServiceApp::activated()
{
- mEventPublisher->setActive(true);
- mEndpointRegistry->setActive(true);
+ mReplicationContext->setComponentActive();
stopListeningToStateReplicator();
}
void BasicRoutingServiceApp::onStandby()
{
- mEventPublisher->setActive(false);
- mEndpointRegistry->setActive(false);
+ mReplicationContext->setComponentStandby();
listenToStateReplicator();
}
/**
* Register as a listener to our state replicator.
- * This is how a standby component stays updated with dynamic state changes
- * in the active component.
+ * A component in standby mode will do this to monitor state changes
+ * being sent from the active component.
*/
void BasicRoutingServiceApp::listenToStateReplicator()
{
// Do we have a reference to our state replicator?
- if (mStateReplicator == 0)
+ if (mReplicationContext->getReplicatorService() == 0)
{
return;
}
// Are we in standby mode?
- if (mReplicaService->isActive() == false)
+ if (mReplicationContext->isComponentActive() == false)
{
- mStateReplicator->addListener(mReplicatorListenerProxy);
+ mReplicationContext->getReplicatorService()->addListener(mReplicatorListenerProxy);
+ mListeningToReplicator = true;
}
}
@@ -316,12 +307,13 @@ void BasicRoutingServiceApp::listenToStateReplicator()
*/
void BasicRoutingServiceApp::stopListeningToStateReplicator()
{
- if (mReplicaService->isActive() == true)
+ if (!mListeningToReplicator)
{
return;
}
- mStateReplicator->removeListener(mReplicatorListenerProxy);
+ mReplicationContext->getReplicatorService()->removeListener(mReplicatorListenerProxy);
+ mListeningToReplicator = false;
}
/**
@@ -438,25 +430,15 @@ void BasicRoutingServiceApp::locateBridgeManager()
/**
* Locate our State Replicator using the Service Locator.
*/
-void BasicRoutingServiceApp::locateStateReplicator()
+void BasicRoutingServiceApp::locateStateReplicator(bool isActive)
{
BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
replicatorParams->name = mCommunicator->getProperties()->getPropertyWithDefault("Sip.StateReplicatorName", "default");
- AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> pw(mServiceLocator, replicatorParams, lg);
- mStateReplicator = pw;
-
- pushStateReplicator();
-}
+ AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> replicator(mServiceLocator, replicatorParams, lg);
-/**
- * Pushes the state replicator to everything in this components that needs to know.
- */
-void BasicRoutingServiceApp::pushStateReplicator()
-{
- mEndpointRegistry->setStateReplicator(mStateReplicator);
- mSessionContext->stateReplicator = mStateReplicator;
+ mReplicationContext->setReplicatorService(replicator);
}
/**
@@ -472,7 +454,11 @@ void BasicRoutingServiceApp::initialize()
bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault("BasicRoutingService.Standby", "no") == "yes");
- mEventPublisher = new RoutingServiceEventPublisher(mAdapter, isActive);
+ // Create the replication context.
+ ReplicationContextPtr replicationContext(new ReplicationContext(isActive));
+ mReplicationContext = replicationContext;
+
+ mEventPublisher = new RoutingServiceEventPublisher(mAdapter);
// setup the logger
ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mAdapter);
@@ -480,7 +466,7 @@ void BasicRoutingServiceApp::initialize()
// Create and configure the EndpointRegistry.
ScriptProcessor* scriptProcesor(new LuaScriptProcessor());
- mEndpointRegistry = new EndpointRegistry(scriptProcesor, mEventPublisher, isActive);
+ mEndpointRegistry = new EndpointRegistry(scriptProcesor, mEventPublisher, mReplicationContext);
// Publish the LocatorRegistry interface.
mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
@@ -494,7 +480,8 @@ void BasicRoutingServiceApp::initialize()
SessionContext *rawSessionContext(new SessionContext(mAdapter,
mEndpointRegistry,
mEventPublisher,
- mWorkQueue));
+ mWorkQueue,
+ mReplicationContext));
SessionContextPtr sessionContextPtr(rawSessionContext);
mSessionContext = sessionContextPtr;
@@ -518,13 +505,22 @@ void BasicRoutingServiceApp::initialize()
// Create and publish our Replica interface support. This interface allows this component
// to be activated or placed in standby mode.
- mReplicaService = new ReplicaImpl(*this, mAdapter, isActive);
- mAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
+ mReplicaManagement = new ReplicaManagement(*this, mAdapter);
+ mAdapter->add(mReplicaManagement, mCommunicator->stringToIdentity(ReplicaServiceId));
// Create and publish our state replicator listener interface.
mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry, mOperationReplicaCache);
mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
+ if (isActive)
+ {
+ activated();
+ }
+ else
+ {
+ onStandby();
+ }
+
mAdapter->activate();
// Get a proxy to the interface for the Service Locator.
@@ -539,7 +535,7 @@ void BasicRoutingServiceApp::initialize()
locateBridgeManager();
- locateStateReplicator();
+ locateStateReplicator(mReplicationContext->isComponentActive());
}
/**
@@ -567,7 +563,7 @@ void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPt
// Register with the state replicator in case we are in standby mode.
// This is done here because during initialize(), the reference to the State Replicator isn't
- // yet available when then ReplicaImpl is created.
+ // yet available when Donatoe servant is created.
listenToStateReplicator();
mRunning = true;
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 4248064..7f8f704 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -23,9 +23,8 @@
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/Logger/IceLogger.h>
-
+#include <AsteriskSCF/StateReplicator.h>
#include "BasicRoutingStateReplicationIf.h"
-#include "BasicRoutingStateReplicator.h"
using namespace std;
using namespace AsteriskSCF::Core;
@@ -41,6 +40,20 @@ namespace
Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingServiceStateReplicator");
}
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+typedef AsteriskSCF::StateReplication::StateReplicator< AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicator,
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr,
+ std::string,
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListenerPrx> RoutingStateReplicatorI;
+typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
+
+};
+};
+
class BasicRoutingStateReplicatorService : public IceBox::Service
{
public:
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8024a59..8c08966 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -38,8 +38,10 @@ asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.h)
asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.cpp)
-asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicator.h)
-asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicatorListener.cpp)
+asterisk_scf_component_add_file(BasicRoutingService ReplicationContext.h)
+asterisk_scf_component_add_file(BasicRoutingService ReplicationContext.cpp)
+asterisk_scf_component_add_file(BasicRoutingService RoutingStateReplicatorListener.h)
+asterisk_scf_component_add_file(BasicRoutingService RoutingStateReplicatorListener.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
@@ -88,7 +90,6 @@ include_directories(${utils_dir}/SmartProxy/include)
include_directories(${API_INCLUDE_DIR})
asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicatorApp.cpp)
-asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicator.h)
asterisk_scf_component_add_slice(BasicRoutingStateReplicator ../local-slice/BasicRoutingStateReplicationIf.ice)
asterisk_scf_component_add_ice_libraries(BasicRoutingStateReplicator IceStorm)
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index f624c49..eba6e25 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -45,9 +45,8 @@ class ConnectBridgedSessionsWithDestReplicatingListener : public SimpleStateMach
{
public:
ConnectBridgedSessionsWithDestReplicatingListener(ConnectBridgedSessionsWithDestinationOperationPtr op,
- const AsteriskSCF::SmartProxy::SmartProxy<
- AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
- : mOperation(op), mReplicator(replicator)
+ const ReplicationContextPtr& replication)
+ : mOperation(op), mReplicationContext(replication)
{
}
@@ -56,6 +55,11 @@ public:
*/
void stateExecutionStart(ConnectBridgedSessionsWithDestinationOp::OperationState state)
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
switch(state)
{
case ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP:
@@ -79,7 +83,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicator->setState(setItems);
+ mReplicationContext->getReplicatorService()->setState(setItems);
// Cache the keys of all pushed items.
mReplicatedStateKeys.push_back(item->key);
@@ -90,6 +94,11 @@ public:
*/
void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
switch(state)
{
case ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS:
@@ -126,9 +135,14 @@ public:
*/
void shutdown()
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions from the state replicator.
- mReplicator->removeState(mReplicatedStateKeys);
+ mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
// Release our reference to the operation.
mOperation.reset();
@@ -145,7 +159,7 @@ public:
private:
Ice::StringSeq mReplicatedStateKeys;
ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
- AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
+ ReplicationContextPtr mReplicationContext;
}; // end ConnectBridgedSessionsWithDestReplicatingListener
@@ -204,7 +218,7 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
listener,
transactionId) );
- boost::shared_ptr<SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener> replicatingListener(new ConnectBridgedSessionsWithDestReplicatingListener(op, context->stateReplicator));
+ boost::shared_ptr<SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener> replicatingListener(new ConnectBridgedSessionsWithDestReplicatingListener(op, context->replicationContext));
op->addStateMachineListener(replicatingListener);
return op;
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 3c1fe6e..fb3207c 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -22,7 +22,6 @@
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
-#include "BasicRoutingStateReplicator.h"
#include "RoutingServiceEventPublisher.h"
#include "ScriptProcessor.h"
@@ -80,8 +79,12 @@ typedef map<std::string, RegisteredLocator> EndpointLocatorMap;
class EndpointRegistryPriv
{
public:
- EndpointRegistryPriv(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher, bool active) :
- mScriptProcessor(scriptProcessor), mEventPublisher(eventPublisher), mActive(active)
+ EndpointRegistryPriv(ScriptProcessor* scriptProcessor,
+ const RoutingEventsPtr& eventPublisher,
+ const ReplicationContextPtr& replicationContext) :
+ mScriptProcessor(scriptProcessor),
+ mEventPublisher(eventPublisher),
+ mReplicationContext(replicationContext)
{
}
@@ -121,7 +124,7 @@ public:
*/
void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
{
- if (!mActive)
+ if (!mReplicationContext->isComponentActive())
{
return;
}
@@ -139,7 +142,7 @@ public:
setItems.push_back(removeEndpointItem);
- mStateReplicator->setState(setItems);
+ mReplicationContext->getReplicatorService()->setState(setItems);
}
catch(const Ice::Exception& e)
{
@@ -159,7 +162,7 @@ public:
const EndpointLocatorPrx& locator,
Event::OperationResult result)
{
- if (!mActive)
+ if (!mReplicationContext->isComponentActive())
{
return;
}
@@ -179,7 +182,7 @@ public:
setItems.push_back(addEndpointItem);
- mStateReplicator->setState(setItems);
+ mReplicationContext->getReplicatorService()->setState(setItems);
}
catch(const Ice::Exception& e)
{
@@ -195,7 +198,7 @@ public:
const RegExSeq& regexList,
OperationResult result)
{
- if (!mActive)
+ if (!mReplicationContext->isComponentActive())
{
return;
}
@@ -205,16 +208,19 @@ public:
{
try
{
- // Push this information to the state replicator.
- RoutingStateItemSeq setItems;
+ if (mReplicationContext->isReplicating())
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
- EndpointLocatorSetDestIdsPtr setDestIdsItem(new EndpointLocatorSetDestIds());
- setDestIdsItem->key = locatorId;
- setDestIdsItem->regExList = regexList;
+ EndpointLocatorSetDestIdsPtr setDestIdsItem(new EndpointLocatorSetDestIds());
+ setDestIdsItem->key = locatorId;
+ setDestIdsItem->regExList = regexList;
- setItems.push_back(setDestIdsItem);
+ setItems.push_back(setDestIdsItem);
- mStateReplicator->setState(setItems);
+ mReplicationContext->getReplicatorService()->setState(setItems);
+ }
}
catch(const Ice::Exception& e)
{
@@ -227,12 +233,10 @@ public:
boost::shared_mutex mLock;
- AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
-
boost::shared_ptr<ScriptProcessor> mScriptProcessor;
EndpointLocatorMap mEndpointLocatorMap;
const RoutingEventsPtr mEventPublisher;
- bool mActive;
+ ReplicationContextPtr mReplicationContext;
};
/**
@@ -333,19 +337,11 @@ typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
/**
* Constructor.
*/
-EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher, bool active) :
- mImpl(new EndpointRegistryPriv(scriptProcessor, eventPublisher, active))
-{
-}
-
-void EndpointRegistry::setActive(bool isActive)
+EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor,
+ const RoutingEventsPtr& eventPublisher,
+ const ReplicationContextPtr& replicationContext) :
+ mImpl(new EndpointRegistryPriv(scriptProcessor, eventPublisher, replicationContext))
{
- mImpl->mActive = isActive;
-}
-
-void EndpointRegistry::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx>& replicator)
-{
- mImpl->mStateReplicator = replicator;
}
/**
@@ -462,7 +458,6 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
}
catch (...)
{
-
lg(Error) << "Exception adding EndpointLocator.";
mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
return;
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index d655880..3f0b19d 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -20,7 +20,7 @@
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
#include <AsteriskSCF/SmartProxy.h>
-#include "BasicRoutingStateReplicationIf.h"
+#include "ReplicationContext.h"
namespace AsteriskSCF
{
@@ -35,11 +35,7 @@ class EndpointRegistry : public AsteriskSCF::Core::Routing::V1::LocatorRegistry
public:
EndpointRegistry(ScriptProcessor* scriptProcessor,
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
- bool active);
-
- void setActive(bool isActive);
-
- void setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator);
+ const ReplicationContextPtr& replicationContext);
/**
* Configure the EndpointRegistry to use a different scriptProcessor than the
diff --git a/src/ReplicationContext.cpp b/src/ReplicationContext.cpp
new file mode 100644
index 0000000..803a562
--- /dev/null
+++ b/src/ReplicationContext.cpp
@@ -0,0 +1,90 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/thread/locks.hpp>
+
+#include "ReplicationContext.h"
+
+using namespace ::AsteriskSCF::SmartProxy;
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+class ReplicationContextPriv
+{
+public:
+ ReplicationContextPriv(bool componentIsActive) : mActive(componentIsActive)
+ {
+ }
+
+ ReplicatorServicePrx mReplicatorService;
+ bool mActive;
+
+ boost::shared_mutex mLock;
+};
+
+ReplicationContext::ReplicationContext(bool componentIsActive)
+ : mImpl(new ReplicationContextPriv(componentIsActive))
+{
+}
+
+bool ReplicationContext::isReplicating()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ return (mImpl->mActive && mImpl->mReplicatorService.initializeOnce());
+}
+
+bool ReplicationContext::isComponentActive()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ return mImpl->mActive;
+}
+
+void ReplicationContext::setComponentActive()
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mActive = true;
+
+}
+
+void ReplicationContext::setComponentStandby()
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mActive = false;
+}
+
+ReplicatorServicePrx ReplicationContext::getReplicatorService()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ return mImpl->mReplicatorService;
+}
+
+void ReplicationContext::setReplicatorService(const ReplicatorServicePrx& service)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mReplicatorService = service;
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ReplicationContext.h b/src/ReplicationContext.h
new file mode 100644
index 0000000..d3a6769
--- /dev/null
+++ b/src/ReplicationContext.h
@@ -0,0 +1,69 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/SmartProxy.h>
+#include "BasicRoutingStateReplicationIf.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+typedef AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> ReplicatorServicePrx;
+
+class ReplicationContextPriv;
+
+/**
+ * This class provides the component's classes with the context needed to perform replication.
+ */
+class ReplicationContext
+{
+public:
+ ReplicationContext(bool componentIsActive);
+
+ /**
+ * Returns indicator of whether this component is both active and has a valid
+ * proxy to it's state replicator.
+ */
+ bool isReplicating();
+
+ /**
+ * Indicates whether this component is in active (as opposed to standby) mode.
+ */
+ bool isComponentActive();
+
+ void setComponentActive();
+ void setComponentStandby();
+
+ /**
+ * Get a reference to the state replicator service.
+ */
+ ReplicatorServicePrx getReplicatorService();
+
+ /**
+ * Sets the reference to the state replicator service.
+ */
+ void setReplicatorService(const ReplicatorServicePrx& service);
+
+private:
+ boost::shared_ptr<ReplicationContextPriv> mImpl;
+};
+typedef boost::shared_ptr<ReplicationContext> ReplicationContextPtr;
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 0d84966..feb44ac 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -45,9 +45,8 @@ class RouteSessionReplicatingListener : public SimpleStateMachine<RouteSessionOp
{
public:
RouteSessionReplicatingListener(RouteSessionOperationPtr op,
- const AsteriskSCF::SmartProxy::SmartProxy<
- AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
- : mOperation(op), mReplicator(replicator)
+ const ReplicationContextPtr& replicationContext)
+ : mOperation(op), mReplicationContext(replicationContext)
{
}
@@ -56,6 +55,11 @@ public:
*/
void stateExecutionStart(RouteSessionOp::OperationState state)
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
switch(state)
{
case RouteSessionOp::STATE_LOOKUP:
@@ -79,7 +83,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicator->setState(setItems);
+ mReplicationContext->getReplicatorService()->setState(setItems);
// Cache the keys of all pushed items.
mReplicatedStateKeys.push_back(item->key);
@@ -90,6 +94,11 @@ public:
*/
void stateExecutionComplete(RouteSessionOp::OperationState state)
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
switch(state)
{
case RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS:
@@ -128,9 +137,14 @@ public:
*/
void shutdown()
{
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions.
- mReplicator->removeState(mReplicatedStateKeys);
+ mReplicationContext->getReplicatorService()->removeState(mReplicatedStateKeys);
// Release our reference to the operation.
mOperation.reset();
@@ -146,7 +160,7 @@ public:
private:
Ice::StringSeq mReplicatedStateKeys;
RouteSessionOperationPtr mOperation;
- AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
+ ReplicationContextPtr mReplicationContext;
};
/**
@@ -206,7 +220,7 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
transactionId));
// Create a listener for pushing replication data.
- boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener> replicatingListener(new RouteSessionReplicatingListener(op, context->stateReplicator));
+ boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener> replicatingListener(new RouteSessionReplicatingListener(op, context->replicationContext));
op->addStateMachineListener(replicatingListener);
return op;
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 3863b03..221ba85 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -43,8 +43,8 @@ namespace BasicRoutingService
class RoutingServiceEventPublisherPriv
{
public:
- RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter, bool active) :
- mAdapter(adapter), mInitialized(false), mActive(active)
+ RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
+ mAdapter(adapter), mInitialized(false)
{
boost::lock_guard<boost::mutex> lock(mLock);
initialize();
@@ -132,16 +132,6 @@ public:
return mInitialized;
}
- void setActive(bool val)
- {
- mActive = val;
- }
-
- bool isActive()
- {
- return mActive;
- }
-
public:
Event::RoutingEventsPrx mEventTopic; // Using one-way proxy.
boost::mutex mLock;
@@ -149,20 +139,14 @@ public:
private:
Ice::ObjectAdapterPtr mAdapter;
bool mInitialized;
- bool mActive;
};
/**
* Class constructor.
*/
-RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter, bool active) :
- mImpl(new RoutingServiceEventPublisherPriv(adapter, active))
-{
-}
-
-void RoutingServiceEventPublisher::setActive(bool val)
+RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter) :
+ mImpl(new RoutingServiceEventPublisherPriv(adapter))
{
- mImpl->setActive(val);
}
/**
@@ -171,7 +155,7 @@ void RoutingServiceEventPublisher::setActive(bool val)
void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
@@ -196,7 +180,7 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
@@ -217,7 +201,7 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
@@ -239,7 +223,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
@@ -259,7 +243,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
@@ -279,7 +263,7 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
- if (!mImpl->isActive() || !mImpl->isInitialized())
+ if (!mImpl->isInitialized())
{
return;
}
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index 29bbce4..58461c8 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -35,9 +35,7 @@ class RoutingServiceEventPublisherPriv;
class RoutingServiceEventPublisher : public ::AsteriskSCF::Core::Routing::V1::Event::RoutingEvents
{
public:
- RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter, bool active);
-
- void setActive(bool val);
+ RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter);
// Overrides
diff --git a/src/BasicRoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
similarity index 76%
rename from src/BasicRoutingStateReplicatorListener.cpp
rename to src/RoutingStateReplicatorListener.cpp
index 53def89..fdb2ac0 100644
--- a/src/BasicRoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -19,8 +19,9 @@
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
-#include "BasicRoutingStateReplicator.h"
+#include "RoutingStateReplicatorListener.h"
#include "OperationReplicaCache.h"
+#include "RouteSessionOperation.h"
using namespace AsteriskSCF::BasicRoutingService::V1;
@@ -60,6 +61,15 @@ public:
void removeStateNoticeImpl(const Ice::StringSeq& itemKeys, const Ice::Current& current)
{
+ // Create the visitor. Smart pointer will cleanup when this method exits.
+ /**
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitorPtr v = new visitor(this);
+
+ for (Ice::StringSeq::iterator s = itemKeys.begin(); s != itemKeys.end(); ++s)
+ {
+ //
+ }
+ **/
}
void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
@@ -74,29 +84,34 @@ public:
private:
RoutingStateReplicatorListenerImpl *mImpl;
- void visitRoutingStateItem(const ::AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr&)
+ void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
{
-
+ mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
}
- void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr&)
+ void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
{
+ mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
}
- void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr&)
+ void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr& opState)
{
+ mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
}
- void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr&)
+ void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
{
+ mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
}
- void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr&)
+ void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
{
+ mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
}
- void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr&)
+ void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr& opState)
{
+ mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
}
void visitEndpointLocatorAdd(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorAddPtr& item)
diff --git a/src/BasicRoutingStateReplicator.h b/src/RoutingStateReplicatorListener.h
similarity index 75%
rename from src/BasicRoutingStateReplicator.h
rename to src/RoutingStateReplicatorListener.h
index c603bbc..f663327 100644
--- a/src/BasicRoutingStateReplicator.h
+++ b/src/RoutingStateReplicatorListener.h
@@ -26,12 +26,6 @@ namespace AsteriskSCF
{
namespace BasicRoutingService
{
-typedef AsteriskSCF::StateReplication::StateReplicator< AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicator,
- AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr,
- std::string,
- AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListenerPrx> RoutingStateReplicatorI;
-typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
-
/**
* Our RoutingStateReplicatorListener implementation.
* This object listens for updates from the state replicator when
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index e87db54..c6d486c 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -26,6 +26,7 @@
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <AsteriskSCF/SmartProxy.h>
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
+#include "ReplicationContext.h"
#include "SessionListener.h"
#include "EndpointRegistry.h"
@@ -48,11 +49,13 @@ public:
SessionContext(const Ice::ObjectAdapterPtr& adapter,
const EndpointRegistryPtr& registry,
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& publisher,
- const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue>& workQueue)
+ const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue>& workQueue,
+ const ReplicationContextPtr& replicationContext)
: adapter(adapter),
endpointRegistry(registry),
eventPublisher(publisher),
- workQueue(workQueue)
+ workQueue(workQueue),
+ replicationContext(replicationContext)
{
}
@@ -62,9 +65,10 @@ public:
const EndpointRegistryPtr endpointRegistry;
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr eventPublisher;
const boost::shared_ptr<AsteriskSCF::Threading::WorkQueue> workQueue;
+ const ReplicationContextPtr replicationContext;
AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx> bridgeManager;
- AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> stateReplicator;
+
};
typedef boost::shared_ptr<SessionContext> SessionContextPtr;
diff --git a/test/TestRouting.cpp b/test/TestRouting.cpp
index 01eb44c..f4a2ae1 100644
--- a/test/TestRouting.cpp
+++ b/test/TestRouting.cpp
@@ -147,7 +147,7 @@ struct GlobalIceFixture
Ice::PropertiesPtr communicatorProps = SharedTestData::instance.communicatorOut->getProperties();
string locatorProp = communicatorProps->getProperty("LocatorRegistry.Proxy");
Ice::ObjectPrx locatorObj = SharedTestData::instance.communicatorOut->propertyToProxy("LocatorRegistry.Proxy");
- SharedTestData::instance.locatorRegistry = LocatorRegistryPrx::checkedCast(locatorObj);
+ SharedTestData::instance.locatorRegistry = LocatorRegistryPrx::uncheckedCast(locatorObj);
// Get the ServiceLocator and ServiceLocator manager
@@ -174,6 +174,11 @@ struct GlobalIceFixture
cerr << ex << endl;
status = 1;
}
+ catch (const std::exception& ex)
+ {
+ cerr << ex.what() << endl;
+ status = 1;
+ }
catch (const char* msg)
{
cerr << msg << endl;
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list