[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Apr 21 18:17:54 CDT 2011
branch "route_replica" has been updated
via f5ffe27cceeb1c20efc7c4e662f88573655bbcd0 (commit)
from 633bee49be17bde2601421db442dcc95ab141fdb (commit)
Summary of changes:
config/routingtest-integ.config.in | 11 +-
src/BasicRoutingServiceApp.cpp | 251 +++++++++++++++++++------------
src/BasicRoutingStateReplicatorApp.cpp | 224 +++++++++++++++++++++--------
src/EndpointRegistry.cpp | 50 +++----
src/OperationReplicaCache.cpp | 194 ++-----------------------
src/OperationReplicaCache.h | 119 ++++++++++++++--
src/RoutingStateReplicatorListener.cpp | 16 +-
src/SessionRouter.cpp | 4 +-
8 files changed, 478 insertions(+), 391 deletions(-)
- Log -----------------------------------------------------------------
commit f5ffe27cceeb1c20efc7c4e662f88573655bbcd0
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Apr 21 18:17:17 2011 -0500
Incorporating review feedback.
diff --git a/config/routingtest-integ.config.in b/config/routingtest-integ.config.in
index 1c64594..5945cf2 100644
--- a/config/routingtest-integ.config.in
+++ b/config/routingtest-integ.config.in
@@ -1,5 +1,7 @@
# This is a configuration file a single process test of the Routing Service.
+Ice.Warn.UnknownProperties=0
+
#Ice.Admin.Endpoints=tcp -p 10006
#Ice.Admin.InstanceName=IceBox
IceBox.InstanceName=IceBox
@@ -36,13 +38,15 @@ BridgeManager.ServiceLocatorId=BridgeService
# Routing Service properties
RoutingService.Endpoints=tcp -p 10050
+RoutingService.ComponentService.Endpoints=tcp -p 10051
RoutingService.ThreadPool.Size=4
RoutingService.ThreadPool.SizeMax=10
RoutingService.ThreadPool.SizeWarn=9
RoutingService.Standby=no
RoutingService.StateReplicatorName=Replicator
-RoutingService2.Endpoints=tcp -p 10051
+RoutingService2.Endpoints=tcp -p 10052
+RoutingService2.ComponentService.Endpoints=tcp -p 10053
RoutingService2.ThreadPool.Size=4
RoutingService2.ThreadPool.SizeMax=10
RoutingService2.ThreadPool.SizeWarn=9
@@ -50,8 +54,9 @@ RoutingService2.Standby=yes
RoutingService2.StateReplicatorName=Replicator
Replicator.InstanceName=Replicator
-Replicator.RoutingReplicator.Endpoints=default -p 10052
-Replicator.RoutingReplicator.ThreadPool.Size=4
+Replicator.Endpoints=default -p 10054
+Replicator.ComponentService.Endpoints=default -p 10055
+Replicator.ThreadPool.Size=4
# AsteriskSCF.RoutingService.logger=Debug
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index f2a762f..36341fc 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -61,6 +61,12 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
+class ComponentServiceImpl;
+typedef ::IceUtil::Handle<ComponentServiceImpl> ComponentServiceImplPtr;
+
+class ReplicaManagement;
+typedef ::IceUtil::Handle<ReplicaManagement> ReplicaManagementPtr;
+
class BasicRoutingServiceApp : public IceBox::Service
{
public:
@@ -87,27 +93,26 @@ public:
void onStandby();
bool isActive();
- /**
- * Get an impementation-specific ID for createing service discovery guids.
- * This is not the same as the app name, which is configurable.
- */
- std::string getImplementationId() {return mImplementationId;}
+ ////// Overrides of IceBox::Service
-public: // Overrides of IceBox::Service
- virtual void start(const string& name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
+ virtual void start(const string& name, const ::Ice::CommunicatorPtr& ic, const ::Ice::StringSeq& args);
virtual void stop();
private:
+ void suspendService(bool shuttingDown);
void initialize();
void locateBridgeManager();
- void locateStateReplicator(bool isActive);
- void registerWithServiceLocator();
- void deregisterFromServiceLocator();
- void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category);
+ void locateStateReplicator();
+ void registerWithServiceLocator(bool includeComponentService);
+ void deregisterFromServiceLocator(bool includeComponentService);
void listenToStateReplicator();
void stopListeningToStateReplicator();
+ /**
+ * Get an impementation-specific ID for createing service discovery guids.
+ * This is not the same as the app name, which is configurable.
+ */
const std::string mImplementationId;
bool mDone;
bool mInitialized;
@@ -129,7 +134,7 @@ private:
// Our published interfaces.
BasicSessionRouterPtr mSessionRouter;
RoutingServiceAdminPtr mAdminInteface;
- ComponentServicePtr mComponentService;
+ ComponentServiceImplPtr mComponentService;
ComponentTestPtr mComponentTest;
AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> mBridgeManager;
RoutingServiceEventPublisherPtr mEventPublisher;
@@ -138,7 +143,7 @@ private:
// Replication support
ReplicationContextPtr mReplicationContext;
- ReplicaPtr mReplicaManagement;
+ ReplicaManagementPtr mReplicaManagement;
AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
RoutingStateReplicatorListenerPtr mReplicatorListener;
@@ -146,9 +151,11 @@ private:
bool mListeningToReplicator;
// Implementation
- Ice::ObjectAdapterPtr mAdapter;
- Ice::CommunicatorPtr mCommunicator;
+ ::Ice::ObjectAdapterPtr mAdapter;
+ ::Ice::ObjectAdapterPtr mComponentServiceAdapter;
+ ::Ice::CommunicatorPtr mCommunicator;
};
+typedef ::IceUtil::Handle<BasicRoutingServiceApp> BasicRoutingServiceAppPtr;
static const string RegistryLocatorObjectId("RoutingServiceLocatorRegistry");
static const string RoutingAdminObjectId("RoutingAdmin");
@@ -163,29 +170,34 @@ static const string ReplicaServiceId("BasicRoutingServiceReplica");
class ComponentServiceImpl : public ComponentService
{
public:
- ComponentServiceImpl(BasicRoutingServiceApp &app) :
+ ComponentServiceImpl(BasicRoutingServiceApp* app) :
mApp(app)
{
}
public: // Overrides of the ComponentService interface.
- void suspend(const Ice::Current&)
+ void suspend(const ::Ice::Current&)
+ {
+ mApp->suspend();
+ }
+
+ void resume(const ::Ice::Current&)
{
- mApp.suspend();
+ mApp->resume();
}
- void resume(const Ice::Current&)
+ void shutdown(const ::Ice::Current&)
{
- mApp.resume();
+ mApp->stop();
}
- void shutdown(const Ice::Current&)
+ void shutdownNotice()
{
- mApp.stop();
+ mApp = 0;
}
private:
- BasicRoutingServiceApp& mApp;
+ BasicRoutingServiceAppPtr mApp;
};
/**
@@ -237,18 +249,18 @@ public:
* @param app
* @param adapter The adapter is assumed to have been activated.
*/
- ReplicaManagement(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter) : mApp(app), mAdapter(adapter)
+ ReplicaManagement(BasicRoutingServiceApp* app, ::Ice::ObjectAdapterPtr adapter) : mApp(app), mAdapter(adapter)
{
}
- bool isActive(const Ice::Current&)
+ bool isActive(const ::Ice::Current&)
{
- return mApp.isActive();
+ return mApp->isActive();
}
- bool activate(const Ice::Current& = ::Ice::Current())
+ bool activate(const ::Ice::Current&)
{
- mApp.activated();
+ mApp->activated();
for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
{
@@ -258,9 +270,9 @@ public:
return true;
}
- void standby(const Ice::Current& = ::Ice::Current())
+ void standby(const ::Ice::Current&)
{
- mApp.onStandby();
+ mApp->onStandby();
for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
{
@@ -268,12 +280,12 @@ public:
}
}
- void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const ::Ice::Current&)
{
mListeners.push_back(listener);
}
- void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const ::Ice::Current&)
{
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
}
@@ -282,14 +294,14 @@ private:
/**
* Pointer to the object adapter we exist on.
*/
- Ice::ObjectAdapterPtr mAdapter;
+ ::Ice::ObjectAdapterPtr mAdapter;
/**
* Listeners that we need to push state change notifications out to.
*/
vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
- BasicRoutingServiceApp& mApp;
+ BasicRoutingServiceAppPtr mApp;
};
bool BasicRoutingServiceApp::isActive()
@@ -351,7 +363,7 @@ void BasicRoutingServiceApp::stopListeningToStateReplicator()
* Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
* other components can look up our interfaces.
*/
-void BasicRoutingServiceApp::setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
+void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
{
// Add category as a parameter to enable other components look this component up.
ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams;
@@ -362,56 +374,58 @@ void BasicRoutingServiceApp::setCategory(const Discovery::V1::ServiceManagementP
/**
* Register this component's primary public interfaces with the Service Locator.
* This enables other Asterisk SCF components to locate the interfaces we are publishing.
+ *
+ * @param includeComponentService If true, registers our ComponentService interface
+ * in addition to all our other interfaces. While it is not uncommon to need to
+ * re-register most of our interfaces due to a pause()/resume(), we only deregister
+ * the ComponentService during shutdown. So this variable would typically be true only
+ * during initial startup.
*/
-void BasicRoutingServiceApp::registerWithServiceLocator()
+void BasicRoutingServiceApp::registerWithServiceLocator(bool includeComponentService)
{
- // Get a proxy to the management interface for the Service Locator, so we can add ourselves into the system discovery mechanisms.
- mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
-
- if (mServiceLocatorManagement == 0)
- {
- throw IceBox::FailureException(__FILE__, __LINE__, "Configuration error: Unable to obtain proxy for property `LocatorServiceManagement.Proxy'");
- }
-
try
{
// Register our RoutingAdmin interface with the Service Locator.
- Ice::ObjectPrx adminObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RoutingAdminObjectId));
+ ::Ice::ObjectPrx adminObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RoutingAdminObjectId));
RoutingServiceAdminPrx adminPrx = RoutingServiceAdminPrx::checkedCast(adminObjectPrx);
- string adminServiceGuid(getImplementationId() + "." + Routing::V1::RoutingServiceAdminDiscoveryCategory); // Should be unique for reporting.
+ string adminServiceGuid(mImplementationId + "." + Routing::V1::RoutingServiceAdminDiscoveryCategory); // Should be unique for reporting.
mAdminManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(adminPrx, adminServiceGuid));
setCategory(mAdminManagement, Routing::V1::RoutingServiceAdminDiscoveryCategory);
// Register our RegistryLocator interface with the Service Locator.
- Ice::ObjectPrx locatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RegistryLocatorObjectId));
+ ::Ice::ObjectPrx locatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(RegistryLocatorObjectId));
LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::checkedCast(locatorObjectPrx);
- string locatorServiceGuid(getImplementationId() + "." + Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory); // Should be unique for reporting.
+ string locatorServiceGuid(mImplementationId + "." + Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory); // Should be unique for reporting.
mRegistryLocatorManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(locatorRegistryPrx, locatorServiceGuid));
setCategory(mRegistryLocatorManagement, Routing::V1::RoutingServiceLocatorRegistryDiscoveryCategory);
- // Register the ComponentService interface with the Service Locator.
- Ice::ObjectPrx componentServiceObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
- ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
- string componentServiceGuid(getImplementationId() + "." + Routing::V1::ComponentServiceDiscoveryCategory); // Should be unique for reporting.
- mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
-
- setCategory(mComponentServiceManagement, Routing::V1::ComponentServiceDiscoveryCategory);
-
- if (mPublishTestInterface)
- {
- // Register our test servant as a facet of the ComponentService interface.
- mAdapter->addFacet(mComponentTest, componentServicePrx->ice_getIdentity(), AsteriskSCF::System::Component::V1::ComponentTestFacet);
- }
-
// Register the SessionRouter interface with the Service Locator.
- Ice::ObjectPrx sessionRouterObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(SessionRouterObjectId));
+ ::Ice::ObjectPrx sessionRouterObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(SessionRouterObjectId));
AsteriskSCF::SessionCommunications::V1::SessionRouterPrx sessionRouterPrx = AsteriskSCF::SessionCommunications::V1::SessionRouterPrx::checkedCast(sessionRouterObjectPrx);
- string sessionRouterGuid(getImplementationId() + "." + Routing::V1::SessionRouterDiscoveryCategory); // Should be unique
+ string sessionRouterGuid(mImplementationId + "." + Routing::V1::SessionRouterDiscoveryCategory); // Should be unique
mSessionRouterManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(sessionRouterPrx, sessionRouterGuid));
setCategory(mSessionRouterManagement, Routing::V1::SessionRouterDiscoveryCategory);
+
+ if (includeComponentService)
+ {
+ // Register the ComponentService interface with the Service Locator.
+ // Note that this interface has it's own adapter.
+ ::Ice::ObjectPrx componentServiceObjectPrx = mComponentServiceAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
+ ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
+ string componentServiceGuid(mImplementationId + "." + Routing::V1::ComponentServiceDiscoveryCategory); // Should be unique for reporting.
+ mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
+
+ setCategory(mComponentServiceManagement, Routing::V1::ComponentServiceDiscoveryCategory);
+
+ if (mPublishTestInterface)
+ {
+ // Register our test servant as a facet of the ComponentService interface.
+ mComponentServiceAdapter->addFacet(mComponentTest, componentServicePrx->ice_getIdentity(), AsteriskSCF::System::Component::V1::ComponentTestFacet);
+ }
+ }
}
catch(const std::exception& e)
{
@@ -424,15 +438,23 @@ void BasicRoutingServiceApp::registerWithServiceLocator()
* Deregister this component's primary public interfaces from the Service Locator.
* This is done at shutdown, and whenever we want to keep other services from locating
* our interfaces.
+ *
+ * @param includeComponentService If true, deregisters our ComponentService interface
+ * in addition to all our other interfaces, making this component unreachable from the
+ * rest of Asterisk SCF. Should only be done during shutdown.
*/
-void BasicRoutingServiceApp::deregisterFromServiceLocator()
+void BasicRoutingServiceApp::deregisterFromServiceLocator(bool includeComponentService)
{
try
{
mRegistryLocatorManagement->unregister();
mAdminManagement->unregister();
- mComponentServiceManagement->unregister();
mSessionRouterManagement->unregister();
+
+ if (includeComponentService)
+ {
+ mComponentServiceManagement->unregister();
+ }
}
catch(...)
{
@@ -461,7 +483,7 @@ void BasicRoutingServiceApp::locateBridgeManager()
/**
* Locate our State Replicator using the Service Locator.
*/
-void BasicRoutingServiceApp::locateStateReplicator(bool isActive)
+void BasicRoutingServiceApp::locateStateReplicator()
{
BasicRoutingService::V1::RoutingStateReplicatorParamsPtr replicatorParams = new BasicRoutingService::V1::RoutingStateReplicatorParams();
replicatorParams->category = BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
@@ -480,9 +502,14 @@ void BasicRoutingServiceApp::initialize()
{
try
{
- // Create the adapter.
+ // Create the primary adapter.
mAdapter = mCommunicator->createObjectAdapter(mAppName);
+ // Create a separate adapter just for the component service. This is
+ // to make it easy to deactivate all of our interfaces other than the
+ // component service for suspend()/resume().
+ mComponentServiceAdapter = mCommunicator->createObjectAdapter(mAppName + ".ComponentService");
+
bool isActive = !(mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".Standby", "no") == "yes");
// Create the replication context.
@@ -528,13 +555,13 @@ void BasicRoutingServiceApp::initialize()
mAdminInteface = new RoutingAdmin(mEndpointRegistry);
mAdapter->add(mAdminInteface, mCommunicator->stringToIdentity(RoutingAdminObjectId));
- // Create and publish the ComponentService interface.
- mComponentService = new ComponentServiceImpl(*this);
- mAdapter->add(mComponentService, mCommunicator->stringToIdentity(ComponentServiceId));
+ // Create and publish the ComponentService interface on it's own dedicated adapter.
+ mComponentService = new ComponentServiceImpl(this);
+ mComponentServiceAdapter->add(mComponentService, mCommunicator->stringToIdentity(ComponentServiceId));
// Create and publish our Replica interface support. This interface allows this component
// to be activated or placed in standby mode.
- mReplicaManagement = new ReplicaManagement(*this, mAdapter);
+ mReplicaManagement = new ReplicaManagement(this, mAdapter);
mAdapter->add(mReplicaManagement, mCommunicator->stringToIdentity(ReplicaServiceId));
// Create and publish our state replicator listener interface.
@@ -551,13 +578,19 @@ void BasicRoutingServiceApp::initialize()
}
mAdapter->activate();
+ mComponentServiceAdapter->activate();
+
+ // Get a proxy to the management interface for the Service Locator manager.
+ // This isso we can add ourselves into the system discovery mechanisms.
+ mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
// Get a proxy to the interface for the Service Locator.
+ // This is so we can find the other components we depend on.
mServiceLocator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
mInitialized = true;
}
- catch(const Ice::Exception &e)
+ catch(const ::Ice::Exception &e)
{
lg(Error) << "Problems in " << mAppName << BOOST_CURRENT_FUNCTION << e.what();
throw e;
@@ -565,13 +598,13 @@ void BasicRoutingServiceApp::initialize()
locateBridgeManager();
- locateStateReplicator(mReplicationContext->isComponentActive());
+ locateStateReplicator();
}
/**
* Implementation of the required IceBox::Service start method.
*/
-void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPtr& communicator, const Ice::StringSeq& args)
+void BasicRoutingServiceApp::start(const string& name, const ::Ice::CommunicatorPtr& communicator, const ::Ice::StringSeq& args)
{
lg(Info) << "Starting...";
@@ -585,15 +618,14 @@ void BasicRoutingServiceApp::start(const string& name, const Ice::CommunicatorPt
else
{
mAdapter->activate();
+ mComponentServiceAdapter->activate();
}
- // Plug back into the Asterisk SCF discovery system so that the interfaces we provide
+ // Plug into the Asterisk SCF discovery system so that the interfaces we provide
// can be located.
- registerWithServiceLocator();
+ registerWithServiceLocator(true);
// Register with the state replicator in case we are in standby mode.
- // This is done here because during initialize(), the reference to the State Replicator isn't
- // yet available when Donatoe servant is created.
listenToStateReplicator();
mRunning = true;
@@ -607,19 +639,51 @@ void BasicRoutingServiceApp::resume()
{
if (!mRunning)
{
- mAdapter->activate();
+ // If we're in standby by mode, listen for state replication.
+ listenToStateReplicator();
// Plug back into the Asterisk SCF discovery system so that the interfaces we provide
// can be located.
- registerWithServiceLocator();
+ registerWithServiceLocator(false);
+
+ // Reactivate the primary adapter.
+ mAdapter->activate();
}
mRunning = true;
}
+/**
+ * Utility function to suspend the service for a suspend() or stop().
+ */
+void BasicRoutingServiceApp::suspendService(bool shuttingDown)
+{
+ if (mRunning)
+ {
+ // Deregister our servants.
+ deregisterFromServiceLocator(shuttingDown);
+
+ // Remove our interfaces from the state replicator.
+ stopListeningToStateReplicator();
+
+ // Deactive the primary adapter.
+ // The adapter that services the ComponentService stays active.
+ mAdapter->deactivate();
+ }
+
+ mRunning = false;
+}
+
+/**
+ * Handle a notice from the ComponentService.
+ */
void BasicRoutingServiceApp::suspend()
{
- stop();
+ lg(Info) << "Suspending...";
+
+ suspendService(false);
+
+ lg(Info) << "Suspended.";
}
/**
@@ -629,32 +693,25 @@ void BasicRoutingServiceApp::stop()
{
lg(Info) << "Stopping...";
- if (mRunning)
- {
- deregisterFromServiceLocator();
-
- // Remove our interfaces from the state replicator.
- stopListeningToStateReplicator();
+ suspendService(true);
- mAdapter->deactivate();
- }
+ // Just in case we were in suspend() mode when told to stop.
+ mComponentServiceManagement->unregister();
- //
- // TODO: Needs to destroy communicator if it is not shared.
- //
+ // Turn off our ComponentService interface.
+ // Only a start() directly from IceBox can restart us now.
+ mComponentServiceAdapter->deactivate();
- mRunning = false;
lg(Info) << "Stopped.";
}
-
} // end BasicRoutingService
} // end AsteriskSCF
extern "C"
{
-ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(Ice::CommunicatorPtr communicator)
+ASTERISK_SCF_ICEBOX_EXPORT IceBox::Service* create(::Ice::CommunicatorPtr communicator)
{
return new AsteriskSCF::BasicRoutingService::BasicRoutingServiceApp;
}
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 5d644d8..dff11e3 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -60,30 +60,35 @@ typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
class BasicRoutingStateReplicatorService : public IceBox::Service
{
public:
- BasicRoutingStateReplicatorService() { };
- ~BasicRoutingStateReplicatorService()
- {
- mComponentService = 0;
- mAdapter = 0;
- mStateReplicator = 0;
- };
+ BasicRoutingStateReplicatorService() : mRunning(false), mInitialized(false) {};
+ ~BasicRoutingStateReplicatorService() {};
+
+ void suspend();
+ void resume();
+
+ ////// Overrides of IceBox::Service methods.
virtual void start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
virtual void stop();
private:
- void initialize(std::string appName, const Ice::CommunicatorPtr& ic);
- void registerWithServiceLocator(const Ice::CommunicatorPtr& ic);
- void deregisterFromServiceLocator();
+ void suspendService(bool shuttingDown);
+ void initialize(const std::string& appName);
+ void registerWithServiceLocator();
+ void deregisterFromServiceLocator(bool includeComponentService);
std::string mAppName;
- //vector<BasicRoutingStateReplicatorListenerPrx> mListeners;
- Ice::ObjectAdapterPtr mAdapter;
+
+ bool mRunning;
+ bool mInitialized;
+ ::Ice::ObjectAdapterPtr mAdapter;
+ ::Ice::ObjectAdapterPtr mComponentServiceAdapter;
+ ::Ice::CommunicatorPtr mCommunicator;
ServiceLocatorManagementPrx mServiceLocatorManagement;
Discovery::V1::ServiceManagementPrx mComponentServiceManagement;
Discovery::V1::ServiceManagementPrx mStateReplicationManagement;
ConfiguredIceLoggerPtr mIceLogger;
- ComponentServicePtr mComponentService;
AsteriskSCF::BasicRoutingService::RoutingStateReplicatorIPtr mStateReplicator;
};
+typedef ::IceUtil::Handle<BasicRoutingStateReplicatorService> BasicRoutingStateReplicatorServicePtr;
static const string ComponentServiceId("BasicRoutingStateReplicatorComponent");
static const string ServiceDiscoveryId("BasicRoutingStateReplicatorService");
@@ -95,26 +100,26 @@ static const string ServiceDiscoveryId("BasicRoutingStateReplicatorService");
class ComponentServiceImpl : public ComponentService
{
public:
- ComponentServiceImpl(BasicRoutingStateReplicatorService &service) : mService(service) {}
+ ComponentServiceImpl(BasicRoutingStateReplicatorService* service) : mService(service) {}
public: // Overrides of the ComponentService interface.
- virtual void suspend(const ::Ice::Current& = ::Ice::Current())
+ virtual void suspend(const ::Ice::Current&)
{
- // TBD... focussed more on the component than the replicator.
+ mService->suspend();
}
- virtual void resume(const ::Ice::Current& = ::Ice::Current())
+ virtual void resume(const ::Ice::Current&)
{
- // TBD...
+ mService->resume();
}
- virtual void shutdown(const ::Ice::Current& = ::Ice::Current())
+ virtual void shutdown(const ::Ice::Current&)
{
- mService.stop();
+ mService->stop();
}
private:
- BasicRoutingStateReplicatorService& mService;
+ BasicRoutingStateReplicatorServicePtr mService;
};
class BasicRoutingStateReplicatorCompare : public ServiceLocatorParamsCompare
@@ -137,37 +142,36 @@ private:
typedef IceUtil::Handle<BasicRoutingStateReplicatorCompare> BasicRoutingStateReplicatorComparePtr;
/**
+ * Helper function to add some parameters to one of our registered interfaces in the ServiceLocator, so that
+ * other components can look up our interfaces.
+ */
+void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement, const string& category)
+{
+ // Add category as a parameter to enable other components look this component up.
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams;
+ genericparams->category = category;
+ serviceManagement->addLocatorParams(genericparams, "");
+}
+
+/**
* Register this component's primary public interfaces with the Service Locator.
* This enables other Asterisk SCF components to locate our interfaces.
*/
-void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::CommunicatorPtr& ic)
+void BasicRoutingStateReplicatorService::registerWithServiceLocator()
{
try
{
- // Get a proxy to the management interface for the Service Locator, so we can add ourselves into the system discovery mechanisms.
- mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(ic->propertyToProxy("LocatorServiceManagement.Proxy"));
-
- if (mServiceLocatorManagement == 0)
- {
- lg(Error) << "Unable to obtain proxy to ServiceLocatorManagement interface. Check config file. This component can't be found until this is corrected." << endl;
- return;
- }
-
// Get a proxy to our ComponentService interface and add it to the Service Locator.
- Ice::ObjectPrx componentServiceObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ComponentServiceId));
+ // Note that this interface has its own adapter.
+ Ice::ObjectPrx componentServiceObjectPrx = mComponentServiceAdapter->createDirectProxy(mCommunicator->stringToIdentity(ComponentServiceId));
ComponentServicePrx componentServicePrx = ComponentServicePrx::checkedCast(componentServiceObjectPrx);
- // The GUID passed in to add service needs to be unique for reporting.
string componentServiceGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory);
mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(mServiceLocatorManagement->addService(componentServicePrx, componentServiceGuid));
+ setCategory(mComponentServiceManagement, AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory);
- // Add category as a parameter to enable other components look this component up.
- ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
- genericparams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorComponentCategory;
-
- mComponentServiceManagement->addLocatorParams(genericparams, "");
-
- Ice::ObjectPrx stateReplicatorObjectPrx = mAdapter->createDirectProxy(ic->stringToIdentity(ServiceDiscoveryId));
+ // Get a proxy to our Replicator interface and add it to the Service Locator.
+ Ice::ObjectPrx stateReplicatorObjectPrx = mAdapter->createDirectProxy(mCommunicator->stringToIdentity(ServiceDiscoveryId));
RoutingStateReplicatorPrx stateReplicatorPrx = RoutingStateReplicatorPrx::checkedCast(stateReplicatorObjectPrx);
string stateReplicationGuid(AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory);
@@ -176,18 +180,18 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
ServiceLocatorParamsPtr discoveryParams = new ServiceLocatorParams();
discoveryParams->category = AsteriskSCF::BasicRoutingService::V1::StateReplicatorDiscoveryCategory;
- string replicatorName = ic->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
- BasicRoutingStateReplicatorCompare* nameCompare = new BasicRoutingStateReplicatorCompare(replicatorName);
- ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(mAdapter->addWithUUID(nameCompare));
+ string replicatorName = mCommunicator->getProperties()->getPropertyWithDefault(mAppName + ".InstanceName", "default");
+ ServiceLocatorParamsComparePrx compareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mAdapter->addWithUUID(new BasicRoutingStateReplicatorCompare(replicatorName)));
string compareGuid = IceUtil::generateUUID();
mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
-
}
catch(...)
{
- lg(Error) << "Exception in " << mAppName << " registerWithServiceLocator()" << endl;
+ lg(Error) << "Exception in " << mAppName << BOOST_CURRENT_FUNCTION << endl;
+ throw;
}
}
@@ -195,52 +199,152 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
* Deregister this component's primary public interfaces from the Service Locator.
* This is done at shutdown, and whenever we want to keep other services from locating
* our interfaces.
+ *
+ * @param includeComponentService If true, deregisters our ComponentService interface
+ * in addition to all our other interfaces, making this component unreachable from the
+ * rest of Asterisk SCF. Should only be done during shutdown.
*/
-void BasicRoutingStateReplicatorService::deregisterFromServiceLocator()
+void BasicRoutingStateReplicatorService::deregisterFromServiceLocator(bool includeComponentService)
{
try
{
- mComponentServiceManagement->unregister();
+ mStateReplicationManagement->unregister();
+
+ if (includeComponentService)
+ {
+ mComponentServiceManagement->unregister();
+ }
}
catch(...)
{
- lg(Error) << "Exception in deregisterFromServiceLocator()." << endl;
+ lg(Error) << "Exception in " << BOOST_CURRENT_FUNCTION << endl;
}
}
-void BasicRoutingStateReplicatorService::initialize(const std::string appName, const Ice::CommunicatorPtr& ic)
+void BasicRoutingStateReplicatorService::initialize(const std::string& appName)
{
- mAdapter = ic->createObjectAdapter(appName + ".RoutingReplicator");
+ mAppName = appName;
+
+ // This is the primary adapter for this component.
+ mAdapter = mCommunicator->createObjectAdapter(mAppName);
+
+ // Create a separate adapter just for the component service. This is
+ // to make it easy to deactivate all of our interfaces other than the
+ // component service to support suspend()/resume().
+ mComponentServiceAdapter = mCommunicator->createObjectAdapter(mAppName + ".ComponentService");
// setup logging client
mIceLogger = createIceLogger(mAdapter);
getLoggerFactory().setLogOutput(mIceLogger->getLogger());
- mAppName = appName;
-
- // Create and publish our ComponentService interface support.
- mComponentService = new ComponentServiceImpl(*this);
- mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
+ // Create and publish our ComponentService interface support on its own adapter.
+ mComponentServiceAdapter->add(new ComponentServiceImpl(this), mCommunicator->stringToIdentity(ComponentServiceId));
// Create our instance of the StateReplicator template.
mStateReplicator = new RoutingStateReplicatorI();
- mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
+ mAdapter->add(mStateReplicator, mCommunicator->stringToIdentity(ServiceDiscoveryId));
+ // Activate our adapters.
mAdapter->activate();
+ mComponentServiceAdapter->activate();
+
+ // Get a proxy to the management interface for the Service Locator, so we can add
+ // ourselves into the system discovery mechanisms.
+ mServiceLocatorManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("LocatorServiceManagement.Proxy"));
+
+ if (mServiceLocatorManagement == 0)
+ {
+ lg(Error) << "Unable to obtain proxy to ServiceLocatorManagement interface. Check config file. " << BOOST_CURRENT_FUNCTION << endl;
+ }
+
+ mInitialized = true;
}
void BasicRoutingStateReplicatorService::start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args)
{
- initialize(name, ic);
+ mCommunicator = ic;
+
+ if (!mInitialized)
+ {
+ initialize(name);
+ }
+ else
+ {
+ mAdapter->activate();
+ mComponentServiceAdapter->activate();
+ }
+
// Plug into the Asterisk SCF discovery system so that the interfaces we provide
// can be located.
- registerWithServiceLocator(ic);
+ registerWithServiceLocator();
+
+ mRunning = true;
+}
+
+void BasicRoutingStateReplicatorService::resume()
+{
+ if (!mRunning)
+ {
+ mAdapter->add(mStateReplicator, mCommunicator->stringToIdentity(ServiceDiscoveryId));
+
+ // Plug back into the Asterisk SCF discovery system so that the interfaces we provide
+ // can be located.
+ registerWithServiceLocator();
+
+ // Reactivate the primary adapter.
+ mAdapter->activate();
+ }
+
+ mRunning = true;
+}
+
+/**
+ * Utility function to suspend the service for a suspend() or stop().
+ */
+void BasicRoutingStateReplicatorService::suspendService(bool shuttingDown)
+{
+ if (mRunning)
+ {
+ // Deregister our servants.
+ deregisterFromServiceLocator(shuttingDown);
+
+ // Deactive the primary adapter.
+ // The adapter that services the ComponentService stays active.
+ mAdapter->deactivate();
+ }
+
+ mRunning = false;
+}
+
+/**
+ * Handle a notice from the ComponentService.
+ */
+void BasicRoutingStateReplicatorService::suspend()
+{
+ lg(Info) << "Suspending...";
+
+ suspendService(false);
+
+ lg(Info) << "Suspended.";
}
+/**
+ * Implementation of the required IceBox::Service stop method.
+ */
void BasicRoutingStateReplicatorService::stop()
{
- // Remove our interfaces from the service locator.
- deregisterFromServiceLocator();
+ lg(Info) << "Stopping...";
+
+ suspendService(true);
+
+ // Just in case we were in suspend() mode when told to stop.
+ mComponentServiceManagement->unregister();
+
+ // Turn off our ComponentService interface.
+ // Only a start() directly from IceBox can restart us now.
+ mComponentServiceAdapter->deactivate();
+
+ lg(Info) << "Stopped.";
}
extern "C"
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 30b951b..f5e52d9 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -106,7 +106,6 @@ public:
mEndpointLocatorMap.erase(locatorId);
}
-
void insertLocatorMapItem(const string& key, const RegisteredLocator& locator)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -239,6 +238,18 @@ public:
mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
}
+ /**
+ * Utiltity to do thread-safe check for existing map entry.
+ */
+ bool locatorExists(const std::string& locatorId, EndpointLocatorMapConstIterator& existing)
+ {
+ EndpointLocatorMapConstIterator end;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ existing = mEndpointLocatorMap.find(locatorId);
+ return existing != mEndpointLocatorMap.end();
+ }
+
boost::shared_mutex mLock;
ScriptProcessorPtr mScriptProcessor;
@@ -258,8 +269,11 @@ class LookupResultCollector : public IceUtil::Shared
public:
/**
* Constructor.
- * @param cb Ice callback.
- * @param numVotes The number of times isSupported will be called.
+ * @param callback AMD callback for the operation that
+ * initiated this lookup.
+ * @param destinatin The destination being looked up.
+ * @param eventPublisher
+ * @param numVotes The number of replies expected.
*/
LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback,
const std::string& destination,
@@ -285,6 +299,7 @@ public:
/**
* Collect results of AMI lookups from multiple EndpointLocators.
+ * We only care about the first reply that gives us endpoints.
*/
void lookupResult(const EndpointSeq& endpoints)
{
@@ -303,7 +318,7 @@ public:
mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
}
- assert(mNumVotes > 0); // isSupported was called too many times
+ assert(mNumVotes > 0);
if (--mNumVotes == 0 && mCallback)
{
@@ -418,27 +433,6 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
}
/**
- * Non-member utiltity to do thread-safe check for existing map entry.
- */
-bool locatorExists(const EndpointLocatorMap& map, const std::string& locatorId, boost::shared_mutex& mtx, EndpointLocatorMapConstIterator& existing)
-{
- EndpointLocatorMapConstIterator end;
-
- { // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mtx);
-
- existing = map.find(locatorId);
- end = map.end();
- }
-
- if (existing == end)
- {
- return false;
- }
- return true;
-}
-
-/**
* Register an EndpointLocator that can provide endpoints.
* @param id A unique identifier for the added EndpointLocator.
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
@@ -452,7 +446,7 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString() << std::endl;
EndpointLocatorMapIterator existing;
- bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+ bool exists = mImpl->locatorExists( locatorId, existing);
if (exists)
{
@@ -484,7 +478,7 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
EndpointLocatorMapIterator existing;
- bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+ bool exists = mImpl->locatorExists(locatorId, existing);
if (!exists)
{
@@ -518,7 +512,7 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
try
{
EndpointLocatorMapIterator existing;
- bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
+ bool exists = mImpl->locatorExists(locatorId, existing);
if (!exists)
{
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index a79d5db..a0c5866 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -28,44 +28,20 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
-typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
-
-/**
- * For each transaction id, we're going to cache the all the state items for the operation.
- * The reason for this is that we don't want to rely on the order in which we will receive the state updates.
- * This class is used to hold all the state updates for a given operation.
- *
- * When an item is called for from the cache, we'll apply all the available state updates we have,
- * in order, up to the highest we have (without missing a state, since each state depends on the results
- * of the previous.)
- */
-template<typename T>
-struct OperationReplicaItem
-{
-public:
- T mOperation;
- StateItemMapType mItems;
-};
-
-typedef std::map<std::string, OperationReplicaItem<RouteSessionOperationPtr> > RouteSessionMapType;
-typedef std::map<std::string, OperationReplicaItem<ConnectBridgedSessionsWithDestinationOperationPtr> > ConnectBridgeWithDestMapType;
-
class OperationReplicaCachePriv
{
public:
OperationReplicaCachePriv(const SessionContextPtr& sessionContext)
- : mSessionContext(sessionContext)
+ : mSessionContext(sessionContext),
+ mRouteSessionCache(new OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation>(sessionContext)),
+ mConnectBridgedSessionsWithDestCache(new OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation>(sessionContext))
{
}
SessionContextPtr mSessionContext;
-
- RouteSessionMapType routeSessionReplicas;
- ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
-
- boost::shared_mutex mRouteSessionLock;
- boost::shared_mutex mConnectBridgedWithDestLock;
+ boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > mRouteSessionCache;
+ boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > mConnectBridgedSessionsWithDestCache;
};
OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext)
@@ -73,166 +49,20 @@ OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionCon
{
}
-/**
- * For a specific type of operation, cache the update item.
- */
-template<typename M, typename O, typename OPtr>
-void cacheIt(M& map, const OperationStateItemPtr& item, const SessionContextPtr& context)
-{
- // See if this transaction is in the cache.
- M::iterator i = map.find(item->transactionId);
- if (i == map.end())
- {
- // Add an entry to the cache.
- OperationReplicaItem<OPtr> replica;
- map[item->transactionId] = replica;
-
- i = map.find(item->transactionId);
- }
- (*i).second.mItems[item->key] = item;
-
- // If we haven't created the replica yet, do so now.
- if ((*i).second.mOperation.get() == 0)
- {
- OPtr work(O::createReplica(context));
- (*i).second.mOperation = work;
- }
-
- // Update the replicated object with newest state update.
- (*i).second.mOperation->reflectUpdate(item);
-}
-
-void OperationReplicaCache::cacheOperation(OperationType type, const OperationStateItemPtr& item)
-{
- switch(type)
- {
- case ROUTE_SESSION_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
- cacheIt<RouteSessionMapType,
- RouteSessionOperation,
- RouteSessionOperationPtr>(mPriv->routeSessionReplicas,
- item,
- mPriv->mSessionContext);
- }
- break;
-
- case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
- cacheIt<ConnectBridgeWithDestMapType,
- ConnectBridgedSessionsWithDestinationOperation,
- ConnectBridgedSessionsWithDestinationOperationPtr>(mPriv->connectBridgedWithDestReplicas,
- item,
- mPriv->mSessionContext); }
- break;
-
- case CONNECT_BRIDGED_SESSIONS_OP:
- // Not replicating this type.
- break;
- }
-}
-
-/**
- * For a specific type of operation, fetch a cached entry if one is available.
- */
-template<typename M, typename OPtr>
-bool fetchIt(M& map, const std::string& transactionId, OPtr& ref)
-{
- M::iterator i = map.find(transactionId);
- if (i == map.end())
- {
- return false;
- }
-
- ref = (*i).second.mOperation;
- map.erase(i);
-
- ref->fastForwardReplica();
-
- return true;
-}
-
-bool OperationReplicaCache::fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref)
-{
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
- return fetchIt<RouteSessionMapType,
- RouteSessionOperationPtr> (mPriv->routeSessionReplicas, transactionId, ref);
-}
-
-bool OperationReplicaCache::fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref)
-{
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
- return fetchIt<ConnectBridgeWithDestMapType,
- ConnectBridgedSessionsWithDestinationOperationPtr> (mPriv->connectBridgedWithDestReplicas, transactionId, ref);
-}
-
-void OperationReplicaCache::dropOperation(OperationType type, std::string transactionId)
+void OperationReplicaCache::clearCache()
{
-
- switch(type)
- {
- case ROUTE_SESSION_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
-
- RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
- if (i != mPriv->routeSessionReplicas.end())
- {
- mPriv->routeSessionReplicas.erase(i);
- }
- }
- break;
-
- case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
-
- ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
- if (i != mPriv->connectBridgedWithDestReplicas.end())
- {
- mPriv->connectBridgedWithDestReplicas.erase(i);
- }
- }
- break;
- }
+ mPriv->mRouteSessionCache->clear();
+ mPriv->mConnectBridgedSessionsWithDestCache->clear();
}
-void OperationReplicaCache::clearCache(OperationType type)
+boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > OperationReplicaCache::getRouteSessionCache()
{
- switch(type)
- {
- case ROUTE_SESSION_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
- mPriv->routeSessionReplicas.clear();
- }
- break;
-
- case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
- mPriv->connectBridgedWithDestReplicas.clear();
- }
- break;
- }
+ return mPriv->mRouteSessionCache;
}
-void OperationReplicaCache::clearCache()
+boost::shared_ptr< OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > OperationReplicaCache::getConnectBridgedSessionsWithDestCache()
{
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mRouteSessionLock);
- mPriv->routeSessionReplicas.clear();
- }
-
- {
- boost::unique_lock<boost::shared_mutex> lock(mPriv->mConnectBridgedWithDestLock);
- mPriv->connectBridgedWithDestReplicas.clear();
- }
+ return mPriv->mConnectBridgedSessionsWithDestCache;
}
} // end BasicRoutingService
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index 0424881..5638006 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -31,27 +31,124 @@ namespace AsteriskSCF
namespace BasicRoutingService
{
-enum OperationType
-{
- ROUTE_SESSION_OP = 0,
- CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP = 1,
- CONNECT_BRIDGED_SESSIONS_OP = 2
-};
+template<typename O>
+class OperationCachePriv;
class OperationReplicaCachePriv;
class SessionContext;
+typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
+
+/**
+ * For each transaction id, we're going to cache the all the state items for the operation.
+ * The reason for this is that we don't want to rely on the order in which we will receive the state updates.
+ * This class is used to hold all the state updates for a given operation.
+ *
+ * When an item is called for from the cache, we'll apply all the available state updates we have,
+ * in order, up to the highest we have (without missing a state, since each state depends on the results
+ * of the previous.) The operation must have fastForwardReplica(...) operation to support this.
+ */
+template<typename T>
+struct OperationReplicaItem
+{
+public:
+ T mOperation;
+ StateItemMapType mItems;
+};
+
+/**
+ * Template to cache some type of operation.
+ */
+template<typename O>
+class OperationCache
+{
+public:
+
+ typedef std::map<std::string, OperationReplicaItem<boost::shared_ptr<O>>> OpMapType;
+
+ OperationCache(const SessionContextPtr& sessionContext)
+ : mSessionContext(sessionContext)
+ {
+ }
+
+ void cacheOperationState(const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ // See if this transaction is in the cache.
+ OpMapType::iterator i = mReplicas.find(item->transactionId);
+ if (i == mReplicas.end())
+ {
+ // Add an entry to the cache.
+ OperationReplicaItem< boost::shared_ptr<O> > replica;
+ mReplicas[item->transactionId] = replica;
+
+ i = mReplicas.find(item->transactionId);
+ }
+
+ // Add this item to the replica's collection of state items.
+ (*i).second.mItems[item->key] = item;
+
+ // If we haven't created the replica yet, do so now.
+ if ((*i).second.mOperation.get() == 0)
+ {
+ boost::shared_ptr<O> workPtr(O::createReplica(mSessionContext));
+ (*i).second.mOperation = workPtr;
+ }
+
+ // Update the replicated object with newest state update.
+ (*i).second.mOperation->reflectUpdate(item);
+ }
+
+ bool fetchOperation(std::string transactionId, boost::shared_ptr<O>& outRef)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ OpMapType::iterator i = mReplicas.find(transactionId);
+ if (i == mReplicas.end())
+ {
+ return false;
+ }
+
+ outRef = (*i).second.mOperation;
+ mReplicas.erase(i);
+
+ outRef->fastForwardReplica();
+
+ return true;
+ }
+
+ void dropOperation(std::string transactionId)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ OpMapType::iterator i = mReplicas.find(transactionId);
+ if (i != mReplicas.end())
+ {
+ mReplicas.erase(i);
+ }
+ }
+
+ void clear()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mReplicas.clear();
+ }
+
+private:
+ SessionContextPtr mSessionContext;
+ OpMapType mReplicas;
+ boost::shared_mutex mLock;
+};
+
class OperationReplicaCache
{
public:
OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
- void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
- bool fetchRouteSessionOp(std::string transactionId, AsteriskSCF::BasicRoutingService::RouteSessionOperationPtr& ref);
- bool fetchConnectBridgedSessionsWithDestOp(std::string transactionId, AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperationPtr& ref);
- void dropOperation(OperationType type, std::string transactionId);
+ boost::shared_ptr< AsteriskSCF::BasicRoutingService::OperationCache<AsteriskSCF::BasicRoutingService::RouteSessionOperation> > getRouteSessionCache();
+ boost::shared_ptr< AsteriskSCF::BasicRoutingService::OperationCache<AsteriskSCF::BasicRoutingService::ConnectBridgedSessionsWithDestinationOperation> > getConnectBridgedSessionsWithDestCache();
- void clearCache(OperationType type);
void clearCache();
private:
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index c62cee1..f0d9e32 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -66,7 +66,7 @@ public:
void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
{
// The operation cache keeps all the collected state for an operation under the transaction id.
- mImpl->mOperationReplicaCache->dropOperation(ROUTE_SESSION_OP, opState->transactionId);
+ mImpl->mOperationReplicaCache->getRouteSessionCache()->dropOperation(opState->transactionId);
}
void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
@@ -81,7 +81,7 @@ public:
void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
{
- mImpl->mOperationReplicaCache->dropOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState->transactionId);
+ mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->dropOperation(opState->transactionId);
}
void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
@@ -131,32 +131,32 @@ public:
void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+ mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
}
void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+ mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
}
void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(ROUTE_SESSION_OP, opState);
+ mImpl->mOperationReplicaCache->getRouteSessionCache()->cacheOperationState(opState);
}
void visitConnectBridgedSessionsWithDestinationOpStart(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+ mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
}
void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+ mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
}
void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr& opState)
{
- mImpl->mOperationReplicaCache->cacheOperation(CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP, opState);
+ mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->cacheOperationState(opState);
}
void visitEndpointLocatorState(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorStatePtr& item)
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 9e87e48..c5909ab 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -180,7 +180,7 @@ void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunication
// Check the cache for a replica with this transaction Id.
RouteSessionOperationPtr routeSessionOp;
- if (mImpl->mOperationReplicaCache->fetchRouteSessionOp(transactionId, routeSessionOp))
+ if (mImpl->mOperationReplicaCache->getRouteSessionCache()->fetchOperation(transactionId, routeSessionOp))
{
routeSessionOp->rehostReplica(cb, current, mImpl.get());
WorkPtr replicaOp(routeSessionOp);
@@ -214,7 +214,7 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const ::Asterisk
// Check the cache for a replica with this transaction Id.
ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
- if (mImpl->mOperationReplicaCache->fetchConnectBridgedSessionsWithDestOp(transactionId, connectBridgedSessionsWithDestOp))
+ if (mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->fetchOperation(transactionId, connectBridgedSessionsWithDestOp))
{
connectBridgedSessionsWithDestOp->rehostReplica(cb, current, mImpl.get());
WorkPtr replicaOp(connectBridgedSessionsWithDestOp);
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list