[asterisk-scf-commits] asterisk-scf/integration/servicediscovery.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed May 2 18:23:04 CDT 2012
branch "retry_deux" has been updated
via 506a284ecd1d4c1870ea13be46979bcf1b078643 (commit)
from 2cc0fd7b58a2265c2a529362cb0c14d91c1ae2e1 (commit)
Summary of changes:
src/ServiceLocator.cpp | 157 ++++++++++++++++++++++++++++++++++++-----------
1 files changed, 120 insertions(+), 37 deletions(-)
- Log -----------------------------------------------------------------
commit 506a284ecd1d4c1870ea13be46979bcf1b078643
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed May 2 18:22:33 2012 -0500
Enhanced the handling of switching between active and standby mode.
diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index 72abb23..bc2f426 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -65,14 +65,25 @@ public:
const Ice::CommunicatorPtr& communicator,
const Ice::StringSeq& args);
void stop();
+ void onStandby();
+ void activated();
private:
+ void verifyProperties();
+
Ice::ObjectAdapterPtr mLocalAdapter;
Ice::ObjectAdapterPtr mDiscoveryAdapter;
Ice::ObjectAdapterPtr mManagementAdapter;
AsteriskSCF::CollocatedIceStorm::CollocatedIceStormPtr mIceStorm;
ReplicaPtr mReplicaService;
ServiceLocatorStateReplicatorPrx mStateReplicator;
+ ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+ IceStorm::TopicManagerPrx mTopicManager;
+ ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy;
+ bool mStandalone;
+ Ice::CommunicatorPtr mCommunicator;
+ string mBackplaneAdapterName;
+ string mServiceAdapterName;
};
/**
@@ -81,10 +92,11 @@ private:
class ReplicaImpl : public Replica
{
public:
- ReplicaImpl(Ice::ObjectAdapterPtr adapter) :
+ ReplicaImpl(Ice::ObjectAdapterPtr adapter, bool active) :
mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter),
- mActive(true) { }
+ mActive(active),
+ mServiceLocatorApp(0) { }
bool isActive(const Ice::Current&)
{
@@ -113,6 +125,11 @@ public:
listeners = mListeners;
}
+ if (mServiceLocatorApp)
+ {
+ mServiceLocatorApp->activated();
+ }
+
for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
listener != listeners.end();
++listener)
@@ -155,6 +172,11 @@ public:
listeners = mListeners;
}
+ if (mServiceLocatorApp)
+ {
+ mServiceLocatorApp->onStandby();
+ }
+
for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
listener != listeners.end();
++listener)
@@ -195,6 +217,11 @@ public:
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
}
+ void setServiceLocatorApp(ServiceLocatorApp* serviceLocatorApp)
+ {
+ mServiceLocatorApp = serviceLocatorApp;
+ }
+
private:
boost::shared_mutex mMutex;
@@ -205,6 +232,8 @@ private:
vector<ReplicaListenerPrx> mListeners;
bool mActive;
+
+ ServiceLocatorApp* mServiceLocatorApp; // Plain pointer to avoid smart pointer circular refs.
};
/**
@@ -227,6 +256,7 @@ public:
*/
void locateAll_async(const AMD_ServiceLocator_locateAllPtr&,
const ServiceLocatorParamsPtr&, const ::Ice::Current&);
+
private:
/**
* A pointer to the ServiceManagement implementation as that is where everything is
@@ -256,14 +286,51 @@ void ServiceLocatorImpl::locateAll_async(const AMD_ServiceLocator_locateAllPtr&
mLocatorServiceManagement->locateAll(cb, params);
}
+void ServiceLocatorApp::verifyProperties()
+{
+ const int defaultSize(4);
+ string strDefaultSize = boost::lexical_cast<std::string>(defaultSize);
+
+ Ice::Int defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0);
+ if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mServiceAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+ {
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Info) << "Configured thread pool size for " << mServiceAdapterName + " is too small, defaulting to " << strDefaultSize;
+ mCommunicator->getProperties()->setProperty(mServiceAdapterName + ".ThreadPool.Size", strDefaultSize);
+ }
+ }
+
+ if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mBackplaneAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+ {
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Info) << "Configured Internal thread pool size for " << mBackplaneAdapterName + " is too small, defaulting to " << strDefaultSize;
+ mCommunicator->getProperties()->setProperty(mBackplaneAdapterName + ".ThreadPool.Size", strDefaultSize);
+ }
+ }
+
+ defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 0);
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Warning) << "Client thread pool size is too small, defaulting to " << strDefaultSize;
+ }
+
+}
+
+
void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr& communicator,
const Ice::StringSeq&)
{
+ mCommunicator = communicator;
+
+ verifyProperties();
+
mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
- string backplaneAdapterName = appName + ".BackplaneAdapter";
- mLocalAdapter = communicator->createObjectAdapterWithEndpoints(backplaneAdapterName,
- communicator->getProperties()->getPropertyWithDefault(backplaneAdapterName + ".Endpoints", "tcp -p 4410"));
+ mBackplaneAdapterName = appName + ".BackplaneAdapter";
+ mLocalAdapter = communicator->createObjectAdapterWithEndpoints(mBackplaneAdapterName,
+ communicator->getProperties()->getPropertyWithDefault(mBackplaneAdapterName + ".Endpoints", "tcp -p 4410"));
ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLocalAdapter);
@@ -275,11 +342,11 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
/* Talk to the topic manager to either create or get the service discovery topic,
* configured or default */
- IceStorm::TopicManagerPrx topicManager = mIceStorm->createTopicManagerProxy(communicator);
+ mTopicManager = mIceStorm->createTopicManagerProxy(communicator);
EventsPrx serviceDiscoveryTopic;
- if (topicManager)
+ if (mTopicManager)
{
Ice::PropertiesPtr props = communicator->getProperties();
string topicName = props->getProperty(appName + ".TopicName");
@@ -292,13 +359,13 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
IceStorm::TopicPrx topic;
try
{
- topic = topicManager->retrieve(topicName);
+ topic = mTopicManager->retrieve(topicName);
}
catch (const IceStorm::NoSuchTopic&)
{
try
{
- topic = topicManager->create(topicName);
+ topic = mTopicManager->create(topicName);
}
catch (const IceStorm::TopicExists&)
{
@@ -315,7 +382,9 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
}
- mReplicaService = new ReplicaImpl(mLocalAdapter);
+ mStandalone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
+ ReplicaImpl* replicaImpl = new ReplicaImpl(mLocalAdapter, mStandalone);
+ mReplicaService = replicaImpl;
mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
/* Management and discovery use separate adapters to provide a level of security,
@@ -323,28 +392,27 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
* into the infrastructure while discovery as a read only function may be allowed to all.
*/
- string managementAdapterName = appName + ".Management.ServiceAdapter";
- mManagementAdapter = communicator->createObjectAdapterWithEndpoints(managementAdapterName,
- communicator->getProperties()->getPropertyWithDefault(managementAdapterName + ".Endpoints", "tcp -p 4412"));
+ mServiceAdapterName = appName + ".Management.ServiceAdapter";
+ mManagementAdapter = communicator->createObjectAdapterWithEndpoints(mServiceAdapterName,
+ communicator->getProperties()->getPropertyWithDefault(mServiceAdapterName + ".Endpoints", "tcp -p 4412"));
- ServiceLocatorManagementImplPtr locatorServiceManagement =
+ mLocatorServiceManagement =
new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
- mManagementAdapter->add(locatorServiceManagement,
+ mManagementAdapter->add(mLocatorServiceManagement,
communicator->stringToIdentity("LocatorServiceManagement"));
mManagementAdapter->activate();
- bool standAlone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
try
{
mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy(appName + ".StateReplicator.Proxy"));
- ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
- ServiceLocatorStateReplicatorListenerPrx replicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
+ ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(mLocatorServiceManagement);
+ mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
- locatorServiceManagement->setStateReplicator(mStateReplicator);
+ mLocatorServiceManagement->setStateReplicator(mStateReplicator);
- if (standAlone)
+ if (mStandalone)
{
lg(Info) << "Operating in an active, standalone state and pushing updates." << endl;
}
@@ -352,8 +420,6 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
{
// The default state is standby, in a replica group.
// In this state, the component must be activated via it's Replica interface.
- mReplicaService->standby(AsteriskSCF::Operations::createContext());
-
if (mStateReplicator == 0)
{
lg(Error) << "Operating in replica group with no access to state replicator defined!" << endl;
@@ -361,7 +427,6 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
assert(false);
}
- mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), replicatorListenerProxy);
lg(Info) << "Operating as a standby replica." << endl;
}
}
@@ -381,35 +446,53 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(locatorAdapterName,
communicator->getProperties()->getPropertyWithDefault(locatorAdapterName + ".Endpoints", "tcp -p 4411"));
- ServiceLocatorPtr locatorService = new ServiceLocatorImpl(locatorServiceManagement);
+ ServiceLocatorPtr locatorService = new ServiceLocatorImpl(mLocatorServiceManagement);
mDiscoveryAdapter->add(locatorService, communicator->stringToIdentity("LocatorService"));
- mDiscoveryAdapter->addFacet(locatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+ mDiscoveryAdapter->addFacet(mLocatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+
+ lg(Info) << "Publishing service discovery.";
+
+ replicaImpl->setServiceLocatorApp(this);
// Make our IceStorm topic manager available to all if we are the active service. This is because by adding it
// we will replicate it.
if (mReplicaService->isActive() == true)
{
- ServiceManagementPrx icestormManagement = locatorServiceManagement->addService(
- AsteriskSCF::Operations::createContext(), topicManager, "TopicManager", Ice::Current());
- ServiceLocatorParamsPtr params = new ServiceLocatorParams;
- params->category = TopicManagerCategory;
- params->service = "default";
- icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+ activated();
+ lg(Info) << "Waiting for requests.";
+ return;
}
- mDiscoveryAdapter->activate();
+ onStandby();
+ lg(Info) << "In standby mode.";
- lg(Info) << "Publishing service discovery.";
+ mDiscoveryAdapter->activate();
+}
- if (standAlone)
+void ServiceLocatorApp::activated()
+{
+ if (!mStandalone)
{
- lg(Info) << "Waiting for requests.";
+ // Stop listening to state replicator.
+ mStateReplicator->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
}
- else
+
+ ServiceManagementPrx icestormManagement = mLocatorServiceManagement->addService(
+ AsteriskSCF::Operations::createContext(), mTopicManager, "TopicManager", Ice::Current());
+ ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+ params->category = TopicManagerCategory;
+ params->service = "default";
+ icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+}
+
+void ServiceLocatorApp::onStandby()
+{
+ if (!mStandalone)
{
- lg(Info) << "In stanby mode.";
+ // Listen to state replicator.
+ mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
}
}
-----------------------------------------------------------------------
--
asterisk-scf/integration/servicediscovery.git
More information about the asterisk-scf-commits
mailing list