[asterisk-scf-commits] asterisk-scf/integration/servicediscovery.git branch "replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Dec 16 15:10:06 UTC 2010


branch "replication" has been updated
       via  f7bf1b966c3652008c3e03ec0529e1b99a3f920c (commit)
      from  aed907cf26570f667cc4fab752d47e85650287f8 (commit)

Summary of changes:
 config/test_component.config.in  |   17 +++++-
 src/CMakeLists.txt               |    1 +
 src/ServiceLocator.cpp           |  120 ++++++++++++++++++++++++++++++++++----
 src/ServiceLocatorManagement.cpp |   66 +++++++++++++++++++-
 src/ServiceLocatorManagement.h   |    6 ++-
 5 files changed, 192 insertions(+), 18 deletions(-)


- Log -----------------------------------------------------------------
commit f7bf1b966c3652008c3e03ec0529e1b99a3f920c
Author: Joshua Colp <jcolp at digium.com>
Date:   Thu Dec 16 11:08:58 2010 -0400

    Get the service locator talking to the state replicator and pushing out state items.

diff --git a/config/test_component.config.in b/config/test_component.config.in
index b793c27..6d13371 100644
--- a/config/test_component.config.in
+++ b/config/test_component.config.in
@@ -40,11 +40,20 @@ AsteriskSCFIceStorm.Trace.TopicManager=2
 #
 AsteriskSCFIceStorm.Flush.Timeout=2000
 
+# Service Locator State Replicator Configuration
+IceBox.Service.ServiceDiscoveryStateReplicator=../src at ServiceLocatorStateReplicator:create
+
+# Test endpoints for the state replicator
+ServiceLocatorStateReplicator.Endpoints=tcp -p 4413:udp -p 4413
+
 #
 # ServiceDiscovery configuration
 #
 IceBox.Service.ServiceDiscovery=../src at service_locator:create
 
+# Test endpoints for the service locator local adapter
+ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
+
 # Test endpoints for the service locator management adapter
 ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
 
@@ -54,6 +63,12 @@ ServiceLocatorAdapter.Endpoints=tcp -p 4411
 # Test endpoints for IceStorm
 TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
 
+# Proxy to the state replicator
+ServiceLocator.StateReplicator.Proxy=ServiceLocatorStateReplicatorService:tcp -p 4413
+
+# Configure ourselves as a master
+ServiceLocatorStateReplicatorListener=no
+
 #
 # Logger configuration
 #
@@ -70,4 +85,4 @@ ServiceLocatorManagement.proxy=ServiceLocatorManagement:tcp -p 4422
 #
 # IceBox load order
 #
-IceBox.LoadOrder=ServiceDiscovery,ServiceDiscoveryTest
+IceBox.LoadOrder=ServiceDiscoveryStateReplicator,ServiceDiscovery,ServiceDiscoveryTest
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 159fd8f..6ee0ff4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -16,6 +16,7 @@ asterisk_scf_component_add_slice(service_locator ServiceLocatorIf)
 asterisk_scf_component_add_slice(service_locator ServiceLocatorEventsIf)
 asterisk_scf_component_add_slice(service_locator ServiceLocatorStateReplicationIf)
 asterisk_scf_component_add_slice(service_locator ComponentServiceIf)
+asterisk_scf_component_add_slice(service_locator ReplicaIf)
 
 asterisk_scf_component_add_file(service_locator ServiceLocator.cpp)
 asterisk_scf_component_add_file(service_locator ServiceLocatorManagement.cpp)
diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index d5242fa..0e2535d 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -20,9 +20,11 @@
 
 #include "Core/Discovery/ServiceLocatorIf.h"
 #include "Core/Discovery/ServiceLocatorEventsIf.h"
+#include "System/Component/ReplicaIf.h"
 
 #include "ServiceLocatorManagement.h"
 #include "ServiceManagement.h"
+#include "ServiceLocatorStateReplicator.h"
 #include "CollocatedIceStorm.h"
 #include "logger.h"
 #include "IceLogger.h"
@@ -32,11 +34,14 @@ using namespace AsteriskSCF::System::Discovery;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::Component::V1;
 
 namespace
 {
 Logger const &lg = getLoggerFactory().getLogger("AsteriskSCF.System.Discovery");
 
+static const string ReplicaServiceId("ServiceLocatorReplica");
+
 /**
  * Implementation of the Ice::Application class
  */
@@ -49,10 +54,68 @@ public:
     void stop();
 
 private:
+    Ice::ObjectAdapterPtr mLocalAdapter;
     Ice::ObjectAdapterPtr mDiscoveryAdapter;
     Ice::ObjectAdapterPtr mManagementAdapter;
     Ice::ObjectAdapterPtr mLoggerAdapter;
     AsteriskSCF::ServiceDiscovery::CollocatedIceStormPtr mIceStorm;
+    ReplicaPtr mReplicaService;
+    ServiceLocatorStateReplicatorPrx mStateReplicator;
+};
+
+/**
+ * Implementation of the Replica interface.
+ */
+class ReplicaImpl : public Replica
+{
+public:
+    ReplicaImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter), mPaused(false), mActive(true) { }
+
+    bool isActive(const Ice::Current&)
+    {
+        return mActive;
+    }
+
+    bool activate(const Ice::Current&)
+    {
+        mActive = true;
+
+        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        {
+            (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+        }
+
+        return true;
+    }
+
+    void standby(const Ice::Current&)
+    {
+        mActive = false;
+
+        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        {
+            (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+        }
+    }
+
+    void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    {
+        mListeners.push_back(listener);
+    }
+
+    void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    {
+        mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+    }
+
+private:
+    Ice::ObjectAdapterPtr mAdapter;
+
+    vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+
+    bool mPaused;
+
+    bool mActive;
 };
 
 /**
@@ -61,7 +124,7 @@ private:
 class ServiceLocatorImpl : public ServiceLocator
 {
 public:
-    ServiceLocatorImpl(ServiceLocatorManagementImpl* LocatorServiceManagement) :
+    ServiceLocatorImpl(ServiceLocatorManagementImplPtr LocatorServiceManagement) :
         mLocatorServiceManagement(LocatorServiceManagement) { };
     /**
      * Asynchronously locate a service for the given parameters.
@@ -81,7 +144,7 @@ private:
      * actually stored. Our ServiceLocator implementation simply acts as a read-only
      *  frontend to it.
      */
-    ServiceLocatorManagementImpl* mLocatorServiceManagement;
+    ServiceLocatorManagementImplPtr mLocatorServiceManagement;
 };
 
 }
@@ -107,6 +170,14 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
 {
     mIceStorm = new AsteriskSCF::ServiceDiscovery::CollocatedIceStorm("AsteriskSCFIceStorm", communicator->getProperties());
 
+    mLoggerAdapter = communicator->createObjectAdapter("LoggerAdapter");
+
+    ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLoggerAdapter);
+
+    getLoggerFactory().setLogOutput(mIceLogger->getLogger());
+
+    mLoggerAdapter->activate();
+
     lg(Info) << "Initializing service discovery component";
 
     /* Talk to the topic manager to either create or get the service discovery topic,
@@ -152,6 +223,14 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
         lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
     }
 
+    mLocalAdapter = communicator->createObjectAdapter(
+        "ServiceLocatorLocalAdapter");
+
+    mReplicaService = new ReplicaImpl(mLocalAdapter);
+    mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
+
+    mLocalAdapter->activate();
+
     /* Management and discovery use separate adapters to provide a level of security,
      * management may want to be protected so arbitrary people can't inject bad services
      * into the infrastructure while discovery as a read only function may be allowed to all.
@@ -159,14 +238,39 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
     mManagementAdapter= communicator->createObjectAdapter(
         "ServiceLocatorManagementAdapter");
 
-    ServiceLocatorManagementImpl* locatorServiceManagement =
-        new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic);
+    ServiceLocatorManagementImplPtr locatorServiceManagement =
+        new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
 
     mManagementAdapter->add(locatorServiceManagement,
         communicator->stringToIdentity("LocatorServiceManagement"));
 
     mManagementAdapter->activate();
 
+    try
+    {
+	mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy("ServiceLocator.StateReplicator.Proxy"));
+        ServiceLocatorStateReplicatorListenerPtr mReplicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
+        ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
+
+	locatorServiceManagement->setStateReplicator(mStateReplicator);
+
+        if (communicator->getProperties()->getPropertyWithDefault("ServiceLocatorStateReplicatorListener", "no") == "yes")
+        {
+            mStateReplicator->addListener(mReplicatorListenerProxy);
+            mReplicaService->standby();
+            lg(Info) << "Operating as a standby replica." << endl;
+        }
+        else
+        {
+            lg(Info) << "Operating in an active state and pushing updates." << endl;
+        }
+    }
+    catch (...)
+    {
+	// If we reach this point then no state replicator is present and we are acting in a stand-alone fashion
+	lg(Info) << "Operating in an active and standalone state." << endl;
+    }
+
     lg(Info) << "Activated service discovery management.";
 
     mDiscoveryAdapter = communicator->createObjectAdapter("ServiceLocatorAdapter");
@@ -177,14 +281,6 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
 
     mDiscoveryAdapter->activate();
 
-    mLoggerAdapter = communicator->createObjectAdapter("LoggerAdapter");
-
-    ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLoggerAdapter);
-
-    getLoggerFactory().setLogOutput(mIceLogger->getLogger());
-
-    mLoggerAdapter->activate();
-
     lg(Info) << "Activated service discovery.";
 
     lg(Info) << "Waiting for requests.";
diff --git a/src/ServiceLocatorManagement.cpp b/src/ServiceLocatorManagement.cpp
index b400a85..c1f63dd 100644
--- a/src/ServiceLocatorManagement.cpp
+++ b/src/ServiceLocatorManagement.cpp
@@ -324,8 +324,9 @@ class AsteriskSCF::ServiceDiscovery::ServiceLocatorManagementImplPriv :
 {
 public:
     ServiceLocatorManagementImplPriv(const Ice::ObjectAdapterPtr& adapter,
-        const EventsPrx& serviceDiscoveryTopic) :
-        mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic)
+        const EventsPrx& serviceDiscoveryTopic,
+	const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+        mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic), mReplicaService(replicaService)
     {
     };
 
@@ -355,6 +356,16 @@ public:
      * A proxy that can be used to publish locator events.
      */
     AsteriskSCF::System::Discovery::EventsPrx mLocatorTopic;
+
+    /**
+     * A proxy to the state replicator we are pushing updates to.
+     */
+    ServiceLocatorStateReplicatorPrx mStateReplicator;
+
+    /**
+     * A pointer to an instance of our replica service.
+     */
+    AsteriskSCF::System::Component::V1::ReplicaPtr mReplicaService;
 };
 
 /**
@@ -362,8 +373,9 @@ public:
  */
 ServiceLocatorManagementImpl::ServiceLocatorManagementImpl(
     const Ice::ObjectAdapterPtr& adapter,
-    const EventsPrx& serviceDiscoveryTopic) :
-    mImpl(new ServiceLocatorManagementImplPriv(adapter, serviceDiscoveryTopic))
+    const EventsPrx& serviceDiscoveryTopic,
+    const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+    mImpl(new ServiceLocatorManagementImplPriv(adapter, serviceDiscoveryTopic, replicaService))
 {
 }
 
@@ -561,6 +573,11 @@ void ServiceLocatorManagementImpl::replicateState(AsteriskSCF::Core::Discovery::
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr params,
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr comparator)
 {
+    if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+    {
+        return;
+    }
+
     ServiceLocatorStateItemSeq items;
 
     if (service)
@@ -577,6 +594,20 @@ void ServiceLocatorManagementImpl::replicateState(AsteriskSCF::Core::Discovery::
     {
 	items.push_back(comparator);
     }
+
+    if (items.size() == 0)
+    {
+        return;
+    }
+
+    try
+    {
+        ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+        oneway->setState(items);
+    }
+    catch (...)
+    {
+    }
 }
 
 /**
@@ -586,6 +617,11 @@ void ServiceLocatorManagementImpl::removeState(AsteriskSCF::Core::Discovery::V1:
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr params,
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr comparator)
 {
+    if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+    {
+        return;
+    }
+
     Ice::StringSeq items;
 
     if (service)
@@ -602,4 +638,26 @@ void ServiceLocatorManagementImpl::removeState(AsteriskSCF::Core::Discovery::V1:
     {
 	items.push_back(comparator->key);
     }
+
+    if (items.size() == 0)
+    {
+        return;
+    }
+
+    try
+    {
+        ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+        oneway->removeState(items);
+    }
+    catch (...)
+    {
+    }
+}
+
+/**
+ * Function which sets the proxy to the state replicator.
+ */
+void ServiceLocatorManagementImpl::setStateReplicator(ServiceLocatorStateReplicatorPrx stateReplicator)
+{
+    mImpl->mStateReplicator = stateReplicator;
 }
diff --git a/src/ServiceLocatorManagement.h b/src/ServiceLocatorManagement.h
index 214bb90..df75daf 100644
--- a/src/ServiceLocatorManagement.h
+++ b/src/ServiceLocatorManagement.h
@@ -20,6 +20,8 @@
 
 #include <IceUtil/Shared.h>
 
+#include "System/Component/ReplicaIf.h"
+
 #include "ServiceLocatorStateReplicationIf.h"
 
 namespace AsteriskSCF
@@ -65,7 +67,8 @@ class ServiceLocatorManagementImpl :
 {
 public:
     ServiceLocatorManagementImpl(const Ice::ObjectAdapterPtr& adapter,
-        const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic);
+        const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic,
+	const AsteriskSCF::System::Component::V1::ReplicaPtr);
     void locate(
         const AsteriskSCF::Core::Discovery::V1::AMD_ServiceLocator_locatePtr&,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&);
@@ -82,6 +85,7 @@ public:
     void removeState(AsteriskSCF::Core::Discovery::V1::ServiceLocatorStateItemPtr,
         AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr,
         AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr);	
+    void setStateReplicator(AsteriskSCF::Core::Discovery::V1::ServiceLocatorStateReplicatorPrx);
 
     //
     // AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagement interface.

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/servicediscovery.git



More information about the asterisk-scf-commits mailing list