[asterisk-scf-commits] asterisk-scf/integration/servicediscovery.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed May 2 18:23:04 CDT 2012


branch "retry_deux" has been updated
       via  506a284ecd1d4c1870ea13be46979bcf1b078643 (commit)
      from  2cc0fd7b58a2265c2a529362cb0c14d91c1ae2e1 (commit)

Summary of changes:
 src/ServiceLocator.cpp |  157 ++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 120 insertions(+), 37 deletions(-)


- Log -----------------------------------------------------------------
commit 506a284ecd1d4c1870ea13be46979bcf1b078643
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed May 2 18:22:33 2012 -0500

    Enhanced the handling of switching between active and standby mode.

diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index 72abb23..bc2f426 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -65,14 +65,25 @@ public:
         const Ice::CommunicatorPtr& communicator,
         const Ice::StringSeq& args);
     void stop();
+    void onStandby();
+    void activated();
 
 private:
+    void verifyProperties();
+    
     Ice::ObjectAdapterPtr mLocalAdapter;
     Ice::ObjectAdapterPtr mDiscoveryAdapter;
     Ice::ObjectAdapterPtr mManagementAdapter;
     AsteriskSCF::CollocatedIceStorm::CollocatedIceStormPtr mIceStorm;
     ReplicaPtr mReplicaService;
     ServiceLocatorStateReplicatorPrx mStateReplicator;
+    ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+    IceStorm::TopicManagerPrx mTopicManager;
+    ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy;
+    bool mStandalone;
+    Ice::CommunicatorPtr mCommunicator;
+    string mBackplaneAdapterName;
+    string mServiceAdapterName;
 };
 
 /**
@@ -81,10 +92,11 @@ private:
 class ReplicaImpl : public Replica
 {
 public:
-    ReplicaImpl(Ice::ObjectAdapterPtr adapter) :
+    ReplicaImpl(Ice::ObjectAdapterPtr adapter, bool active) :
         mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
-        mActive(true) { }
+        mActive(active),
+        mServiceLocatorApp(0) { }
 
     bool isActive(const Ice::Current&)
     {
@@ -113,6 +125,11 @@ public:
                 listeners = mListeners;
             }
 
+            if (mServiceLocatorApp)
+            {
+                mServiceLocatorApp->activated();
+            }
+
             for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
                  listener != listeners.end();
                  ++listener)
@@ -155,6 +172,11 @@ public:
                 listeners = mListeners;
             }
 
+            if (mServiceLocatorApp)
+            {
+                mServiceLocatorApp->onStandby();
+            }
+
             for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
                  listener != listeners.end();
                  ++listener)
@@ -195,6 +217,11 @@ public:
         mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
     }
 
+    void setServiceLocatorApp(ServiceLocatorApp* serviceLocatorApp)
+    {
+        mServiceLocatorApp = serviceLocatorApp;
+    }
+
 private:
     boost::shared_mutex mMutex;
 
@@ -205,6 +232,8 @@ private:
     vector<ReplicaListenerPrx> mListeners;
 
     bool mActive;
+
+    ServiceLocatorApp* mServiceLocatorApp; // Plain pointer to avoid smart pointer circular refs. 
 };
 
 /**
@@ -227,6 +256,7 @@ public:
      */
     void locateAll_async(const AMD_ServiceLocator_locateAllPtr&,
         const ServiceLocatorParamsPtr&, const ::Ice::Current&);
+
 private:
     /**
      * A pointer to the ServiceManagement implementation as that is where everything is
@@ -256,14 +286,51 @@ void ServiceLocatorImpl::locateAll_async(const AMD_ServiceLocator_locateAllPtr&
     mLocatorServiceManagement->locateAll(cb, params);
 }
 
+void ServiceLocatorApp::verifyProperties()
+{
+    const int defaultSize(4);
+    string strDefaultSize = boost::lexical_cast<std::string>(defaultSize);
+
+    Ice::Int defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0);
+    if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mServiceAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+    {
+        if (defaultPoolSize < defaultSize)
+        {
+            lg(Info) << "Configured thread pool size for " << mServiceAdapterName + " is too small, defaulting to " << strDefaultSize;
+            mCommunicator->getProperties()->setProperty(mServiceAdapterName + ".ThreadPool.Size", strDefaultSize);
+        }
+    }
+
+    if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mBackplaneAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+    {
+        if (defaultPoolSize < defaultSize)
+        {
+            lg(Info) << "Configured Internal thread pool size for " << mBackplaneAdapterName + " is too small, defaulting to " << strDefaultSize;
+            mCommunicator->getProperties()->setProperty(mBackplaneAdapterName + ".ThreadPool.Size", strDefaultSize);
+        }
+    }
+
+    defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 0);
+    if (defaultPoolSize < defaultSize)
+    {
+        lg(Warning) << "Client thread pool size is too small, defaulting to " << strDefaultSize;
+    }
+
+}
+
+
 void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr& communicator,
     const Ice::StringSeq&)
 {
+    mCommunicator = communicator;
+
+    verifyProperties();
+
     mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
 
-    string backplaneAdapterName = appName + ".BackplaneAdapter";
-    mLocalAdapter = communicator->createObjectAdapterWithEndpoints(backplaneAdapterName,
-        communicator->getProperties()->getPropertyWithDefault(backplaneAdapterName + ".Endpoints", "tcp -p 4410"));
+    mBackplaneAdapterName = appName + ".BackplaneAdapter";
+    mLocalAdapter = communicator->createObjectAdapterWithEndpoints(mBackplaneAdapterName,
+        communicator->getProperties()->getPropertyWithDefault(mBackplaneAdapterName + ".Endpoints", "tcp -p 4410"));
 
     ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLocalAdapter);
 
@@ -275,11 +342,11 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
 
     /* Talk to the topic manager to either create or get the service discovery topic,
      * configured or default */
-    IceStorm::TopicManagerPrx topicManager = mIceStorm->createTopicManagerProxy(communicator);
+    mTopicManager = mIceStorm->createTopicManagerProxy(communicator);
 
     EventsPrx serviceDiscoveryTopic;
 
-    if (topicManager)
+    if (mTopicManager)
     {
         Ice::PropertiesPtr props = communicator->getProperties();
         string topicName = props->getProperty(appName + ".TopicName");
@@ -292,13 +359,13 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
         IceStorm::TopicPrx topic;
         try
         {
-            topic = topicManager->retrieve(topicName);
+            topic = mTopicManager->retrieve(topicName);
         }
         catch (const IceStorm::NoSuchTopic&)
         {
             try
             {
-                topic = topicManager->create(topicName);
+                topic = mTopicManager->create(topicName);
             }
             catch (const IceStorm::TopicExists&)
             {
@@ -315,7 +382,9 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
         lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
     }
 
-    mReplicaService = new ReplicaImpl(mLocalAdapter);
+    mStandalone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
+    ReplicaImpl* replicaImpl = new ReplicaImpl(mLocalAdapter, mStandalone);
+    mReplicaService = replicaImpl;
     mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
 
     /* Management and discovery use separate adapters to provide a level of security,
@@ -323,28 +392,27 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
      * into the infrastructure while discovery as a read only function may be allowed to all.
      */
 
-    string managementAdapterName = appName + ".Management.ServiceAdapter";
-    mManagementAdapter = communicator->createObjectAdapterWithEndpoints(managementAdapterName,
-        communicator->getProperties()->getPropertyWithDefault(managementAdapterName + ".Endpoints", "tcp -p 4412"));
+    mServiceAdapterName = appName + ".Management.ServiceAdapter";
+    mManagementAdapter = communicator->createObjectAdapterWithEndpoints(mServiceAdapterName,
+        communicator->getProperties()->getPropertyWithDefault(mServiceAdapterName + ".Endpoints", "tcp -p 4412"));
 
-    ServiceLocatorManagementImplPtr locatorServiceManagement =
+    mLocatorServiceManagement =
         new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
 
-    mManagementAdapter->add(locatorServiceManagement,
+    mManagementAdapter->add(mLocatorServiceManagement,
         communicator->stringToIdentity("LocatorServiceManagement"));
 
     mManagementAdapter->activate();
 
-    bool standAlone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
     try
     {
 	mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy(appName + ".StateReplicator.Proxy"));
-        ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
-        ServiceLocatorStateReplicatorListenerPrx replicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
+        ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(mLocatorServiceManagement);
+        mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
 
-	locatorServiceManagement->setStateReplicator(mStateReplicator);
+	mLocatorServiceManagement->setStateReplicator(mStateReplicator);
 
-        if (standAlone)
+        if (mStandalone)
         {
             lg(Info) << "Operating in an active, standalone state and pushing updates." << endl;
         }
@@ -352,8 +420,6 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
         {
             // The default state is standby, in a replica group. 
             // In this state, the component must be activated via it's Replica interface.
-	    mReplicaService->standby(AsteriskSCF::Operations::createContext());
-
             if (mStateReplicator == 0)
             {
                 lg(Error) << "Operating in replica group with no access to state replicator defined!" << endl;
@@ -361,7 +427,6 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
                 assert(false);
             }
 
-            mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), replicatorListenerProxy);
             lg(Info) << "Operating as a standby replica." << endl;
         }
     }
@@ -381,35 +446,53 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
     mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(locatorAdapterName,
         communicator->getProperties()->getPropertyWithDefault(locatorAdapterName + ".Endpoints", "tcp -p 4411"));
 
-    ServiceLocatorPtr locatorService = new ServiceLocatorImpl(locatorServiceManagement);
+    ServiceLocatorPtr locatorService = new ServiceLocatorImpl(mLocatorServiceManagement);
 
     mDiscoveryAdapter->add(locatorService, communicator->stringToIdentity("LocatorService"));
 
-    mDiscoveryAdapter->addFacet(locatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+    mDiscoveryAdapter->addFacet(mLocatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+
+    lg(Info) << "Publishing service discovery.";
+
+    replicaImpl->setServiceLocatorApp(this);
 
     // Make our IceStorm topic manager available to all if we are the active service. This is because by adding it
     // we will replicate it.
     if (mReplicaService->isActive() == true)
     {
-	ServiceManagementPrx icestormManagement = locatorServiceManagement->addService(
-            AsteriskSCF::Operations::createContext(), topicManager, "TopicManager", Ice::Current());
-	ServiceLocatorParamsPtr params = new ServiceLocatorParams;
-	params->category = TopicManagerCategory;
-	params->service = "default";
-	icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+        activated();
+        lg(Info) << "Waiting for requests.";
+        return;
     }
 
-    mDiscoveryAdapter->activate();
+    onStandby();
+    lg(Info) << "In standby mode.";
 
-    lg(Info) << "Publishing service discovery.";
+    mDiscoveryAdapter->activate();
+}
 
-    if (standAlone)
+void ServiceLocatorApp::activated()
+{
+    if (!mStandalone)
     {
-        lg(Info) << "Waiting for requests.";
+        // Stop listening to state replicator.
+        mStateReplicator->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
     }
-    else
+
+    ServiceManagementPrx icestormManagement = mLocatorServiceManagement->addService(
+    AsteriskSCF::Operations::createContext(), mTopicManager, "TopicManager", Ice::Current());
+    ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+    params->category = TopicManagerCategory;
+    params->service = "default";
+    icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+}
+
+void ServiceLocatorApp::onStandby()
+{
+    if (!mStandalone)
     {
-        lg(Info) << "In stanby mode.";
+        // Listen to state replicator.
+        mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
     }
 }
 

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/servicediscovery.git



More information about the asterisk-scf-commits mailing list