[asterisk-scf-commits] asterisk-scf/integration/servicediscovery.git branch "replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Dec 16 15:10:06 UTC 2010
branch "replication" has been updated
via f7bf1b966c3652008c3e03ec0529e1b99a3f920c (commit)
from aed907cf26570f667cc4fab752d47e85650287f8 (commit)
Summary of changes:
config/test_component.config.in | 17 +++++-
src/CMakeLists.txt | 1 +
src/ServiceLocator.cpp | 120 ++++++++++++++++++++++++++++++++++----
src/ServiceLocatorManagement.cpp | 66 +++++++++++++++++++-
src/ServiceLocatorManagement.h | 6 ++-
5 files changed, 192 insertions(+), 18 deletions(-)
- Log -----------------------------------------------------------------
commit f7bf1b966c3652008c3e03ec0529e1b99a3f920c
Author: Joshua Colp <jcolp at digium.com>
Date: Thu Dec 16 11:08:58 2010 -0400
Get the service locator talking to the state replicator and pushing out state items.
diff --git a/config/test_component.config.in b/config/test_component.config.in
index b793c27..6d13371 100644
--- a/config/test_component.config.in
+++ b/config/test_component.config.in
@@ -40,11 +40,20 @@ AsteriskSCFIceStorm.Trace.TopicManager=2
#
AsteriskSCFIceStorm.Flush.Timeout=2000
+# Service Locator State Replicator Configuration
+IceBox.Service.ServiceDiscoveryStateReplicator=../src at ServiceLocatorStateReplicator:create
+
+# Test endpoints for the state replicator
+ServiceLocatorStateReplicator.Endpoints=tcp -p 4413:udp -p 4413
+
#
# ServiceDiscovery configuration
#
IceBox.Service.ServiceDiscovery=../src at service_locator:create
+# Test endpoints for the service locator local adapter
+ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
+
# Test endpoints for the service locator management adapter
ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
@@ -54,6 +63,12 @@ ServiceLocatorAdapter.Endpoints=tcp -p 4411
# Test endpoints for IceStorm
TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+# Proxy to the state replicator
+ServiceLocator.StateReplicator.Proxy=ServiceLocatorStateReplicatorService:tcp -p 4413
+
+# Configure ourselves as a master
+ServiceLocatorStateReplicatorListener=no
+
#
# Logger configuration
#
@@ -70,4 +85,4 @@ ServiceLocatorManagement.proxy=ServiceLocatorManagement:tcp -p 4422
#
# IceBox load order
#
-IceBox.LoadOrder=ServiceDiscovery,ServiceDiscoveryTest
+IceBox.LoadOrder=ServiceDiscoveryStateReplicator,ServiceDiscovery,ServiceDiscoveryTest
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 159fd8f..6ee0ff4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -16,6 +16,7 @@ asterisk_scf_component_add_slice(service_locator ServiceLocatorIf)
asterisk_scf_component_add_slice(service_locator ServiceLocatorEventsIf)
asterisk_scf_component_add_slice(service_locator ServiceLocatorStateReplicationIf)
asterisk_scf_component_add_slice(service_locator ComponentServiceIf)
+asterisk_scf_component_add_slice(service_locator ReplicaIf)
asterisk_scf_component_add_file(service_locator ServiceLocator.cpp)
asterisk_scf_component_add_file(service_locator ServiceLocatorManagement.cpp)
diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index d5242fa..0e2535d 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -20,9 +20,11 @@
#include "Core/Discovery/ServiceLocatorIf.h"
#include "Core/Discovery/ServiceLocatorEventsIf.h"
+#include "System/Component/ReplicaIf.h"
#include "ServiceLocatorManagement.h"
#include "ServiceManagement.h"
+#include "ServiceLocatorStateReplicator.h"
#include "CollocatedIceStorm.h"
#include "logger.h"
#include "IceLogger.h"
@@ -32,11 +34,14 @@ using namespace AsteriskSCF::System::Discovery;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::Component::V1;
namespace
{
Logger const &lg = getLoggerFactory().getLogger("AsteriskSCF.System.Discovery");
+static const string ReplicaServiceId("ServiceLocatorReplica");
+
/**
* Implementation of the Ice::Application class
*/
@@ -49,10 +54,68 @@ public:
void stop();
private:
+ Ice::ObjectAdapterPtr mLocalAdapter;
Ice::ObjectAdapterPtr mDiscoveryAdapter;
Ice::ObjectAdapterPtr mManagementAdapter;
Ice::ObjectAdapterPtr mLoggerAdapter;
AsteriskSCF::ServiceDiscovery::CollocatedIceStormPtr mIceStorm;
+ ReplicaPtr mReplicaService;
+ ServiceLocatorStateReplicatorPrx mStateReplicator;
+};
+
+/**
+ * Implementation of the Replica interface.
+ */
+class ReplicaImpl : public Replica
+{
+public:
+ ReplicaImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter), mPaused(false), mActive(true) { }
+
+ bool isActive(const Ice::Current&)
+ {
+ return mActive;
+ }
+
+ bool activate(const Ice::Current&)
+ {
+ mActive = true;
+
+ for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ {
+ (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
+
+ return true;
+ }
+
+ void standby(const Ice::Current&)
+ {
+ mActive = false;
+
+ for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ {
+ (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
+ }
+
+ void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ {
+ mListeners.push_back(listener);
+ }
+
+ void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ {
+ mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+ }
+
+private:
+ Ice::ObjectAdapterPtr mAdapter;
+
+ vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+
+ bool mPaused;
+
+ bool mActive;
};
/**
@@ -61,7 +124,7 @@ private:
class ServiceLocatorImpl : public ServiceLocator
{
public:
- ServiceLocatorImpl(ServiceLocatorManagementImpl* LocatorServiceManagement) :
+ ServiceLocatorImpl(ServiceLocatorManagementImplPtr LocatorServiceManagement) :
mLocatorServiceManagement(LocatorServiceManagement) { };
/**
* Asynchronously locate a service for the given parameters.
@@ -81,7 +144,7 @@ private:
* actually stored. Our ServiceLocator implementation simply acts as a read-only
* frontend to it.
*/
- ServiceLocatorManagementImpl* mLocatorServiceManagement;
+ ServiceLocatorManagementImplPtr mLocatorServiceManagement;
};
}
@@ -107,6 +170,14 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
{
mIceStorm = new AsteriskSCF::ServiceDiscovery::CollocatedIceStorm("AsteriskSCFIceStorm", communicator->getProperties());
+ mLoggerAdapter = communicator->createObjectAdapter("LoggerAdapter");
+
+ ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLoggerAdapter);
+
+ getLoggerFactory().setLogOutput(mIceLogger->getLogger());
+
+ mLoggerAdapter->activate();
+
lg(Info) << "Initializing service discovery component";
/* Talk to the topic manager to either create or get the service discovery topic,
@@ -152,6 +223,14 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
}
+ mLocalAdapter = communicator->createObjectAdapter(
+ "ServiceLocatorLocalAdapter");
+
+ mReplicaService = new ReplicaImpl(mLocalAdapter);
+ mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
+
+ mLocalAdapter->activate();
+
/* Management and discovery use separate adapters to provide a level of security,
* management may want to be protected so arbitrary people can't inject bad services
* into the infrastructure while discovery as a read only function may be allowed to all.
@@ -159,14 +238,39 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
mManagementAdapter= communicator->createObjectAdapter(
"ServiceLocatorManagementAdapter");
- ServiceLocatorManagementImpl* locatorServiceManagement =
- new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic);
+ ServiceLocatorManagementImplPtr locatorServiceManagement =
+ new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
mManagementAdapter->add(locatorServiceManagement,
communicator->stringToIdentity("LocatorServiceManagement"));
mManagementAdapter->activate();
+ try
+ {
+ mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy("ServiceLocator.StateReplicator.Proxy"));
+ ServiceLocatorStateReplicatorListenerPtr mReplicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
+ ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
+
+ locatorServiceManagement->setStateReplicator(mStateReplicator);
+
+ if (communicator->getProperties()->getPropertyWithDefault("ServiceLocatorStateReplicatorListener", "no") == "yes")
+ {
+ mStateReplicator->addListener(mReplicatorListenerProxy);
+ mReplicaService->standby();
+ lg(Info) << "Operating as a standby replica." << endl;
+ }
+ else
+ {
+ lg(Info) << "Operating in an active state and pushing updates." << endl;
+ }
+ }
+ catch (...)
+ {
+ // If we reach this point then no state replicator is present and we are acting in a stand-alone fashion
+ lg(Info) << "Operating in an active and standalone state." << endl;
+ }
+
lg(Info) << "Activated service discovery management.";
mDiscoveryAdapter = communicator->createObjectAdapter("ServiceLocatorAdapter");
@@ -177,14 +281,6 @@ void ServiceLocatorApp::start(const string& name, const Ice::CommunicatorPtr& co
mDiscoveryAdapter->activate();
- mLoggerAdapter = communicator->createObjectAdapter("LoggerAdapter");
-
- ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLoggerAdapter);
-
- getLoggerFactory().setLogOutput(mIceLogger->getLogger());
-
- mLoggerAdapter->activate();
-
lg(Info) << "Activated service discovery.";
lg(Info) << "Waiting for requests.";
diff --git a/src/ServiceLocatorManagement.cpp b/src/ServiceLocatorManagement.cpp
index b400a85..c1f63dd 100644
--- a/src/ServiceLocatorManagement.cpp
+++ b/src/ServiceLocatorManagement.cpp
@@ -324,8 +324,9 @@ class AsteriskSCF::ServiceDiscovery::ServiceLocatorManagementImplPriv :
{
public:
ServiceLocatorManagementImplPriv(const Ice::ObjectAdapterPtr& adapter,
- const EventsPrx& serviceDiscoveryTopic) :
- mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic)
+ const EventsPrx& serviceDiscoveryTopic,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+ mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic), mReplicaService(replicaService)
{
};
@@ -355,6 +356,16 @@ public:
* A proxy that can be used to publish locator events.
*/
AsteriskSCF::System::Discovery::EventsPrx mLocatorTopic;
+
+ /**
+ * A proxy to the state replicator we are pushing updates to.
+ */
+ ServiceLocatorStateReplicatorPrx mStateReplicator;
+
+ /**
+ * A pointer to an instance of our replica service.
+ */
+ AsteriskSCF::System::Component::V1::ReplicaPtr mReplicaService;
};
/**
@@ -362,8 +373,9 @@ public:
*/
ServiceLocatorManagementImpl::ServiceLocatorManagementImpl(
const Ice::ObjectAdapterPtr& adapter,
- const EventsPrx& serviceDiscoveryTopic) :
- mImpl(new ServiceLocatorManagementImplPriv(adapter, serviceDiscoveryTopic))
+ const EventsPrx& serviceDiscoveryTopic,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+ mImpl(new ServiceLocatorManagementImplPriv(adapter, serviceDiscoveryTopic, replicaService))
{
}
@@ -561,6 +573,11 @@ void ServiceLocatorManagementImpl::replicateState(AsteriskSCF::Core::Discovery::
AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr params,
AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr comparator)
{
+ if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+ {
+ return;
+ }
+
ServiceLocatorStateItemSeq items;
if (service)
@@ -577,6 +594,20 @@ void ServiceLocatorManagementImpl::replicateState(AsteriskSCF::Core::Discovery::
{
items.push_back(comparator);
}
+
+ if (items.size() == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+ oneway->setState(items);
+ }
+ catch (...)
+ {
+ }
}
/**
@@ -586,6 +617,11 @@ void ServiceLocatorManagementImpl::removeState(AsteriskSCF::Core::Discovery::V1:
AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr params,
AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr comparator)
{
+ if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+ {
+ return;
+ }
+
Ice::StringSeq items;
if (service)
@@ -602,4 +638,26 @@ void ServiceLocatorManagementImpl::removeState(AsteriskSCF::Core::Discovery::V1:
{
items.push_back(comparator->key);
}
+
+ if (items.size() == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+ oneway->removeState(items);
+ }
+ catch (...)
+ {
+ }
+}
+
+/**
+ * Function which sets the proxy to the state replicator.
+ */
+void ServiceLocatorManagementImpl::setStateReplicator(ServiceLocatorStateReplicatorPrx stateReplicator)
+{
+ mImpl->mStateReplicator = stateReplicator;
}
diff --git a/src/ServiceLocatorManagement.h b/src/ServiceLocatorManagement.h
index 214bb90..df75daf 100644
--- a/src/ServiceLocatorManagement.h
+++ b/src/ServiceLocatorManagement.h
@@ -20,6 +20,8 @@
#include <IceUtil/Shared.h>
+#include "System/Component/ReplicaIf.h"
+
#include "ServiceLocatorStateReplicationIf.h"
namespace AsteriskSCF
@@ -65,7 +67,8 @@ class ServiceLocatorManagementImpl :
{
public:
ServiceLocatorManagementImpl(const Ice::ObjectAdapterPtr& adapter,
- const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic);
+ const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic,
+ const AsteriskSCF::System::Component::V1::ReplicaPtr);
void locate(
const AsteriskSCF::Core::Discovery::V1::AMD_ServiceLocator_locatePtr&,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&);
@@ -82,6 +85,7 @@ public:
void removeState(AsteriskSCF::Core::Discovery::V1::ServiceLocatorStateItemPtr,
AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsStateItemPtr,
AsteriskSCF::Core::Discovery::V1::ServiceLocatorComparatorStateItemPtr);
+ void setStateReplicator(AsteriskSCF::Core::Discovery::V1::ServiceLocatorStateReplicatorPrx);
//
// AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagement interface.
-----------------------------------------------------------------------
--
asterisk-scf/integration/servicediscovery.git
More information about the asterisk-scf-commits
mailing list