[asterisk-scf-commits] asterisk-scf/release/servicediscovery.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:29:13 CDT 2012
branch "master" has been updated
via f8976984cbb446e42c72161be404aee13380a2bb (commit)
from ab11dc1d69e959ef7966bb359433060723b77c27 (commit)
Summary of changes:
config/test_component.conf | 4 +-
.../ServiceLocatorStateReplicationIf.ice | 100 +++---
src/ServiceLocator.cpp | 343 ++++++++++++++++----
src/ServiceLocatorManagement.cpp | 130 ++++++--
src/ServiceLocatorManagement.h | 5 +-
src/ServiceLocatorStateListener.cpp | 215 ++++++++-----
src/ServiceLocatorStateReplicator.h | 4 +-
src/ServiceLocatorStateReplicatorApp.cpp | 11 +-
src/ServiceManagement.cpp | 159 +++++++---
src/ServiceManagement.h | 9 +-
test/CMakeLists.txt | 2 +
test/TestComparatorBlocking.cpp | 11 +-
test/TestServiceLocator.cpp | 168 +++++++++--
13 files changed, 853 insertions(+), 308 deletions(-)
- Log -----------------------------------------------------------------
commit f8976984cbb446e42c72161be404aee13380a2bb
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 12:06:58 2012 -0500
Changes for new retry logic.
diff --git a/config/test_component.conf b/config/test_component.conf
index 7ea0784..c4a3217 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -3,7 +3,7 @@
#
# Ice configuration
#
-
+Ice.ThreadPool.Client.Size=4
# Collocation is incompatible with AMI/AMD which sharing a communicator
Ice.Default.CollocationOptimized=0
@@ -38,7 +38,7 @@ ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
ServiceDiscovery.StateReplicator.Proxy=ServiceLocatorStateReplicatorService:tcp -h 127.0.0.1 -p 4413
# Configure ourselves as a master
-ServiceDiscovery.Standalone=no
+ServiceDiscovery.Standalone=yes
ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 4421
ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 4422:udp -h 127.0.0.1 -p 4422
diff --git a/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice b/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
index 3f509a9..65c0420 100644
--- a/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -19,6 +19,7 @@
#include <Ice/BuiltinSequences.ice>
#include <Ice/Identity.ice>
#include "AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice"
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -32,52 +33,57 @@ module ServiceLocator
["suppress"]
module V1
{
- const string StateReplicatorComponentCategory = "ServiceLocatorStateReplicatorComponent";
- const string StateReplicatorDiscoveryCategory = "ServiceLocatorStateReplicator";
-
- class ServiceLocatorStateItem
- {
- string key;
- };
-
- sequence<ServiceLocatorStateItem> ServiceLocatorStateItemSeq;
-
- interface ServiceLocatorStateReplicatorListener
- {
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateSet(ServiceLocatorStateItemSeq items);
- };
-
- interface ServiceLocatorStateReplicator
- {
- void addListener(ServiceLocatorStateReplicatorListener *listener);
- void removeListener(ServiceLocatorStateReplicatorListener *listener);
- void setState (ServiceLocatorStateItemSeq items);
- void removeState(Ice::StringSeq items);
- idempotent ServiceLocatorStateItemSeq getState(Ice::StringSeq itemKeys);
- idempotent ServiceLocatorStateItemSeq getAllState();
- };
-
- class ServiceLocatorServiceStateItem extends ServiceLocatorStateItem
- {
- bool suspended;
- Object *service;
- Ice::Identity managementIdentity;
- string guid;
- };
-
- class ServiceLocatorParamsStateItem extends ServiceLocatorStateItem
- {
- string serviceKey;
- AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams params;
- string compareGuid;
- };
-
- class ServiceLocatorComparatorStateItem extends ServiceLocatorStateItem
- {
- string name;
- AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsCompare *comparator;
- };
+
+const string StateReplicatorComponentCategory = "ServiceLocatorStateReplicatorComponent";
+const string StateReplicatorDiscoveryCategory = "ServiceLocatorStateReplicator";
+
+class ServiceLocatorStateItem
+{
+ string key;
+};
+
+sequence<ServiceLocatorStateItem> ServiceLocatorStateItemSeq;
+
+interface ServiceLocatorStateReplicatorListener
+{
+ idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext,
+ ServiceLocatorStateItemSeq items);
+};
+
+interface ServiceLocatorStateReplicator
+{
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext,
+ ServiceLocatorStateReplicatorListener *listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext,
+ ServiceLocatorStateReplicatorListener *listener);
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext,
+ ServiceLocatorStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+ idempotent ServiceLocatorStateItemSeq getState(Ice::StringSeq itemKeys);
+ idempotent ServiceLocatorStateItemSeq getAllState();
+};
+
+class ServiceLocatorServiceStateItem extends ServiceLocatorStateItem
+{
+ bool suspended;
+ Object *service;
+ Ice::Identity managementIdentity;
+ string guid;
+};
+
+class ServiceLocatorParamsStateItem extends ServiceLocatorStateItem
+{
+ string serviceKey;
+ AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams params;
+ string compareGuid;
+};
+
+class ServiceLocatorComparatorStateItem extends ServiceLocatorStateItem
+{
+ string name;
+ AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsCompare *comparator;
+};
}; /* module V1 */
diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index 014805d..2a647ed 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,6 +14,11 @@
* at the top of the source tree.
*/
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <IceBox/IceBox.h>
@@ -25,19 +30,24 @@
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/CollocatedIceStorm/CollocatedIceStorm.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "ServiceLocatorManagement.h"
#include "ServiceManagement.h"
#include "ServiceLocatorStateReplicator.h"
-using namespace std;
-using namespace AsteriskSCF;
-using namespace AsteriskSCF::System::Discovery;
-using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::ServiceLocator::V1;
using namespace AsteriskSCF::ServiceDiscovery;
using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Discovery;
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF;
+using namespace std;
namespace
{
@@ -55,14 +65,26 @@ public:
const Ice::CommunicatorPtr& communicator,
const Ice::StringSeq& args);
void stop();
+ void onStandby();
+ void activated();
private:
+ void verifyProperties();
+
Ice::ObjectAdapterPtr mLocalAdapter;
Ice::ObjectAdapterPtr mDiscoveryAdapter;
Ice::ObjectAdapterPtr mManagementAdapter;
AsteriskSCF::CollocatedIceStorm::CollocatedIceStormPtr mIceStorm;
ReplicaPtr mReplicaService;
ServiceLocatorStateReplicatorPrx mStateReplicator;
+ ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+ IceStorm::TopicManagerPrx mTopicManager;
+ ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy;
+ bool mStandalone;
+ Ice::CommunicatorPtr mCommunicator;
+ string mBackplaneAdapterName;
+ string mDiscoveryAdapterName;
+ string mManagementAdapterName;
};
/**
@@ -71,53 +93,160 @@ private:
class ReplicaImpl : public Replica
{
public:
- ReplicaImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter), mPaused(false), mActive(true) { }
+ ReplicaImpl(Ice::ObjectAdapterPtr adapter, bool active) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mAdapter(adapter),
+ mActive(active),
+ mServiceLocatorApp(0) { }
bool isActive(const Ice::Current&)
{
+ // naturally idempotent
+ boost::shared_lock<boost::shared_mutex> lock(mMutex);
return mActive;
}
- bool activate(const Ice::Current&)
+ bool activate(const OperationContextPtr& context, const Ice::Current&)
{
- mActive = true;
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
- for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ if (!data)
+ {
+ // retry detected
+ return true; // always returns true.
+ }
+
+ try
+ {
+ lg(Info) << "ServiceLocator activate() received...";
+
+ // it's not good to loop through the listeners while holding the lock, so make a copy
+ vector<ReplicaListenerPrx> listeners;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ mActive = true;
+ listeners = mListeners;
+ }
+
+ if (mServiceLocatorApp)
+ {
+ lg(Info) << "ServiceLocator activate() calling the app's callback...";
+ mServiceLocatorApp->activated();
+ }
+
+ lg(Info) << "ServiceLocator activate() notifying listeners...";
+ for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
+ listener != listeners.end();
+ ++listener)
+ {
+ (*listener)->activated(AsteriskSCF::Operations::createContext(context),
+ ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
+
+ lg(Info) << "ServiceLocator activated.";
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
{
- (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ data->setException();
+ throw;
}
+ data->setCompleted();
+
return true;
}
- void standby(const Ice::Current&)
+ void standby(const OperationContextPtr& context, const Ice::Current&)
{
- mActive = false;
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
- for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+ if (!data)
{
- (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ // retry detected
+ return;
+ }
+
+ try
+ {
+ // it's not good to loop through the listeners while holding the lock, so make a copy
+ vector<ReplicaListenerPrx> listeners;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
+ mActive = false;
+ listeners = mListeners;
+ }
+
+ if (mServiceLocatorApp)
+ {
+ mServiceLocatorApp->onStandby();
+ }
+
+ for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
+ listener != listeners.end();
+ ++listener)
+ {
+ (*listener)->onStandby(AsteriskSCF::Operations::createContext(context),
+ ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+ }
}
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ data->setException();
+ throw;
+ }
+
+ data->setCompleted();
}
- void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ void addListener(const OperationContextPtr& context, const ReplicaListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mListeners.push_back(listener);
}
- void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+ void removeListener(const OperationContextPtr& context, const ReplicaListenerPrx& listener, const Ice::Current&)
{
+ if (!mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mMutex);
mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
}
+ void setServiceLocatorApp(ServiceLocatorApp* serviceLocatorApp)
+ {
+ mServiceLocatorApp = serviceLocatorApp;
+ }
+
private:
- Ice::ObjectAdapterPtr mAdapter;
+ boost::shared_mutex mMutex;
- vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+ OperationContextCachePtr mOperationContextCache;
- bool mPaused;
+ Ice::ObjectAdapterPtr mAdapter;
+
+ vector<ReplicaListenerPrx> mListeners;
bool mActive;
+
+ ServiceLocatorApp* mServiceLocatorApp; // Plain pointer to avoid smart pointer circular refs.
};
/**
@@ -140,13 +269,14 @@ public:
*/
void locateAll_async(const AMD_ServiceLocator_locateAllPtr&,
const ServiceLocatorParamsPtr&, const ::Ice::Current&);
+
private:
/**
* A pointer to the ServiceManagement implementation as that is where everything is
* actually stored. Our ServiceLocator implementation simply acts as a read-only
* frontend to it.
*/
- ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+ const ServiceLocatorManagementImplPtr mLocatorServiceManagement;
};
}
@@ -156,6 +286,7 @@ void ServiceLocatorImpl::locate_async(const AMD_ServiceLocator_locatePtr& cb,
const ::Ice::Current&)
{
// delegate to the management object, where the data is kept
+ // mLocatorServiceManagement is const; no lock needed
mLocatorServiceManagement->locate(cb, params);
}
@@ -164,17 +295,68 @@ void ServiceLocatorImpl::locateAll_async(const AMD_ServiceLocator_locateAllPtr&
const ::Ice::Current&)
{
// delegate to the management object, where the data is kept
+ // mLocatorServiceManagement is const; no lock needed
mLocatorServiceManagement->locateAll(cb, params);
}
+void ServiceLocatorApp::verifyProperties()
+{
+ const int defaultSize(4);
+ string strDefaultSize = boost::lexical_cast<std::string>(defaultSize);
+
+ Ice::Int defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0);
+
+ // NOTE: The ServiceLocator, unlike other components, uses two adapters for its primary services: management, and discovery.
+
+ if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mDiscoveryAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+ {
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Info) << "Configured thread pool size for " << mDiscoveryAdapterName + " is too small, defaulting to " << strDefaultSize;
+ mCommunicator->getProperties()->setProperty(mDiscoveryAdapterName + ".ThreadPool.Size", strDefaultSize);
+ }
+ }
+
+ if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mManagementAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+ {
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Info) << "Configured thread pool size for " << mManagementAdapterName + " is too small, defaulting to " << strDefaultSize;
+ mCommunicator->getProperties()->setProperty(mManagementAdapterName + ".ThreadPool.Size", strDefaultSize);
+ }
+ }
+
+ if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mBackplaneAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+ {
+ if (defaultPoolSize < defaultSize)
+ {
+ lg(Info) << "Configured Internal thread pool size for " << mBackplaneAdapterName + " is too small, defaulting to " << strDefaultSize;
+ mCommunicator->getProperties()->setProperty(mBackplaneAdapterName + ".ThreadPool.Size", strDefaultSize);
+ }
+ }
+
+ int clientSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 0);
+ if (clientSize < defaultSize)
+ {
+ lg(Warning) << "Client thread pool size of " << defaultPoolSize << " is too small! Set to " << strDefaultSize << " or greater.";
+ assert(false);
+ }
+}
+
void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr& communicator,
const Ice::StringSeq&)
{
- mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
+ mCommunicator = communicator;
+
+ mBackplaneAdapterName = appName + ".BackplaneAdapter";
+ mDiscoveryAdapterName = appName + ".Locator.ServiceAdapter";
+ mManagementAdapterName = appName + ".Management.ServiceAdapter";
+ verifyProperties();
- string backplaneAdapterName = appName + ".BackplaneAdapter";
- mLocalAdapter = communicator->createObjectAdapterWithEndpoints(backplaneAdapterName,
- communicator->getProperties()->getPropertyWithDefault(backplaneAdapterName + ".Endpoints", "tcp -p 4410"));
+ mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
+
+ mLocalAdapter = communicator->createObjectAdapterWithEndpoints(mBackplaneAdapterName,
+ communicator->getProperties()->getPropertyWithDefault(mBackplaneAdapterName + ".Endpoints", "tcp -p 4410"));
ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLocalAdapter);
@@ -186,11 +368,11 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
/* Talk to the topic manager to either create or get the service discovery topic,
* configured or default */
- IceStorm::TopicManagerPrx topicManager = mIceStorm->createTopicManagerProxy(communicator);
+ mTopicManager = mIceStorm->createTopicManagerProxy(communicator);
EventsPrx serviceDiscoveryTopic;
- if (topicManager)
+ if (mTopicManager)
{
Ice::PropertiesPtr props = communicator->getProperties();
string topicName = props->getProperty(appName + ".TopicName");
@@ -203,13 +385,13 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
IceStorm::TopicPrx topic;
try
{
- topic = topicManager->retrieve(topicName);
+ topic = mTopicManager->retrieve(topicName);
}
catch (const IceStorm::NoSuchTopic&)
{
try
{
- topic = topicManager->create(topicName);
+ topic = mTopicManager->create(topicName);
}
catch (const IceStorm::TopicExists&)
{
@@ -226,22 +408,22 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
}
- mReplicaService = new ReplicaImpl(mLocalAdapter);
+ mStandalone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
+ ReplicaImpl* replicaImpl = new ReplicaImpl(mLocalAdapter, mStandalone);
+ mReplicaService = replicaImpl;
mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
/* Management and discovery use separate adapters to provide a level of security,
* management may want to be protected so arbitrary people can't inject bad services
* into the infrastructure while discovery as a read only function may be allowed to all.
*/
+ mManagementAdapter = communicator->createObjectAdapterWithEndpoints(mManagementAdapterName,
+ communicator->getProperties()->getPropertyWithDefault(mManagementAdapterName + ".Endpoints", "tcp -p 4412"));
- string managementAdapterName = appName + ".Management.ServiceAdapter";
- mManagementAdapter = communicator->createObjectAdapterWithEndpoints(managementAdapterName,
- communicator->getProperties()->getPropertyWithDefault(managementAdapterName + ".Endpoints", "tcp -p 4412"));
-
- ServiceLocatorManagementImplPtr locatorServiceManagement =
+ mLocatorServiceManagement =
new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
- mManagementAdapter->add(locatorServiceManagement,
+ mManagementAdapter->add(mLocatorServiceManagement,
communicator->stringToIdentity("LocatorServiceManagement"));
mManagementAdapter->activate();
@@ -249,20 +431,27 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
try
{
mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy(appName + ".StateReplicator.Proxy"));
- ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
- ServiceLocatorStateReplicatorListenerPrx replicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
+ ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(mLocatorServiceManagement);
+ mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
- locatorServiceManagement->setStateReplicator(mStateReplicator);
+ mLocatorServiceManagement->setStateReplicator(mStateReplicator);
- if (getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false))
+ if (mStandalone)
{
- mReplicaService->standby();
- mStateReplicator->addListener(replicatorListenerProxy);
- lg(Info) << "Operating as a standby replica." << endl;
+ lg(Info) << "Operating in an active, standalone state and pushing updates." << endl;
}
- else
+ else
{
- lg(Info) << "Operating in an active state and pushing updates." << endl;
+ // The default state is standby, in a replica group.
+ // In this state, the component must be activated via it's Replica interface.
+ if (mStateReplicator == 0)
+ {
+ lg(Error) << "Operating in replica group with no access to state replicator defined!" << endl;
+ lg(Error) << "--- Check config file for <servicename>.StateReplicator.Proxy setting, or run standalone." << endl;
+ assert(false);
+ }
+
+ lg(Info) << "Operating as a standby replica." << endl;
}
}
catch (const std::exception& e)
@@ -275,34 +464,67 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
lg(Warning) << "Operating in an active and standalone state." << endl;
}
- lg(Info) << "Activated service discovery management.";
+ lg(Info) << "Publishing service discovery management.";
- string locatorAdapterName = appName + ".Locator.ServiceAdapter";
- mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(locatorAdapterName,
- communicator->getProperties()->getPropertyWithDefault(locatorAdapterName + ".Endpoints", "tcp -p 4411"));
+ mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(mDiscoveryAdapterName,
+ communicator->getProperties()->getPropertyWithDefault(mDiscoveryAdapterName + ".Endpoints", "tcp -p 4411"));
- ServiceLocatorPtr locatorService = new ServiceLocatorImpl(locatorServiceManagement);
+ ServiceLocatorPtr locatorService = new ServiceLocatorImpl(mLocatorServiceManagement);
mDiscoveryAdapter->add(locatorService, communicator->stringToIdentity("LocatorService"));
- mDiscoveryAdapter->addFacet(locatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+ mDiscoveryAdapter->addFacet(mLocatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+
+ lg(Info) << "Publishing service discovery.";
+
+ replicaImpl->setServiceLocatorApp(this);
+
+ mDiscoveryAdapter->activate();
// Make our IceStorm topic manager available to all if we are the active service. This is because by adding it
// we will replicate it.
if (mReplicaService->isActive() == true)
{
- ServiceManagementPrx icestormManagement = locatorServiceManagement->addService(topicManager, "TopicManager", Ice::Current());
- ServiceLocatorParamsPtr params = new ServiceLocatorParams;
- params->category = TopicManagerCategory;
- params->service = "default";
- icestormManagement->addLocatorParams(params, "");
+ activated();
+ lg(Info) << "Waiting for requests.";
+ return;
}
- mDiscoveryAdapter->activate();
+ onStandby();
+ lg(Info) << "In standby mode.";
+}
- lg(Info) << "Activated service discovery.";
+void ServiceLocatorApp::activated()
+{
+ lg(Info) << "App activated callback entered.";
- lg(Info) << "Waiting for requests.";
+ if (!mStandalone)
+ {
+
+ lg(Info) << " Removing listener from Replicator";
+ // Stop listening to state replicator.
+ mStateReplicator->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
+ }
+
+ lg(Info) << " Creating topic manager";
+ ServiceManagementPrx icestormManagement = mLocatorServiceManagement->addService(
+ AsteriskSCF::Operations::createContext(), mTopicManager, "TopicManager", Ice::Current());
+ ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+ params->category = TopicManagerCategory;
+ params->service = "default";
+
+ lg(Info) << " Adding locator params for topic manager";
+ icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+ lg(Info) << "Activated callback complete";
+}
+
+void ServiceLocatorApp::onStandby()
+{
+ if (!mStandalone)
+ {
+ // Listen to state replicator.
+ mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
+ }
}
void ServiceLocatorApp::stop()
@@ -323,10 +545,7 @@ void ServiceLocatorApp::stop()
mIceStorm->stop();
}
-extern "C"
-{
-ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
+extern "C" ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
{
return new ServiceLocatorApp;
}
-}
diff --git a/src/ServiceLocatorManagement.cpp b/src/ServiceLocatorManagement.cpp
index 0a953c8..11e31dd 100644
--- a/src/ServiceLocatorManagement.cpp
+++ b/src/ServiceLocatorManagement.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,29 +14,35 @@
* at the top of the source tree.
*/
-#include <Ice/Ice.h>
-#include <IceUtil/UUID.h>
-
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
#include <boost/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
+
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Async/ResponseCollector.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "ServiceLocatorStateReplicationIf.h"
#include "ServiceLocatorManagement.h"
#include "ServiceManagement.h"
-using namespace std;
-using namespace AsteriskSCF::System::Discovery;
-using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::ServiceLocator::V1;
using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::Discovery;
+using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF;
+using namespace std;
//
// Normally would use an anonymous namespace here, but there is a compiler
@@ -47,6 +53,9 @@ namespace ServiceLocatorManagementImplNS
const Logger& lg = getLoggerFactory().getLogger("AsteriskSCF.System.Discovery");
+typedef ContextResultData<ServiceManagementPrx> AddServiceResultData;
+typedef boost::shared_ptr<AddServiceResultData> AddServiceResultDataPtr;
+
/** Parameter type for Locator, to allow use of ResponseCollector. */
typedef std::pair<bool, ServiceManagementImplPtr> LocateParam;
@@ -317,9 +326,10 @@ public:
ServiceLocatorManagementImplPriv(const Ice::ObjectAdapterPtr& adapter,
const EventsPrx& serviceDiscoveryTopic,
const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic), mReplicaService(replicaService)
{
- };
+ }
/**
* Shared mutex lock which protects the services and comparators.
@@ -327,6 +337,11 @@ public:
boost::shared_mutex mLock;
/**
+ * Context cache for retry detection.
+ */
+ OperationContextCachePtr mOperationContextCache;
+
+ /**
* Object adapter that our service management proxies originate from, it is
* houses the main management service.
*/
@@ -420,8 +435,16 @@ void ServiceLocatorManagementImpl::locateAll(
* Implementation of the addService method as defined in service_locator.ice
*/
ServiceManagementPrx ServiceLocatorManagementImpl::addService(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
const Ice::ObjectPrx& service, const string& guid, const Ice::Current&)
{
+ std::pair<bool, AddServiceResultDataPtr> cacheHit = getContextSync<AddServiceResultDataPtr>(mImpl->mOperationContextCache, context);
+
+ if (cacheHit.first)
+ {
+ return cacheHit.second->getResult();
+ }
+
lg(Debug) << "addService(" << guid << ')';
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
@@ -434,10 +457,12 @@ ServiceManagementPrx ServiceLocatorManagementImpl::addService(
mImpl->mServices.push_back(new_service);
- return new_service->getServiceManagementPrx();
+ ServiceManagementPrx r = new_service->getServiceManagementPrx();
+ cacheHit.second->setResult(r);
+ return r;
}
-ServiceManagementImplPtr ServiceLocatorManagementImpl::addService(const Ice::ObjectPrx& service,
+ServiceManagementImplPtr ServiceLocatorManagementImpl::addService(const Ice::ObjectPrx& service,
const std::string& guid, const Ice::Identity& identity)
{
lg(Debug) << "addService(" << guid << ')';
@@ -484,42 +509,87 @@ ServiceInfo ServiceLocatorManagementImpl::getService(const std::string &guid, co
/**
* Implementation of the addCompare method as defined in service_locator.ice
*/
-void ServiceLocatorManagementImpl::addCompare(const string& guid,
+void ServiceLocatorManagementImpl::addCompare(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& guid,
const ServiceLocatorParamsComparePrx& service, const Ice::Current&)
{
- lg(Debug) << "addCompare(" << guid << ')';
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
-
- pair<map<string, ServiceLocatorParamsComparePrx>::iterator, bool> insertPair =
- mImpl->mCompares.insert(make_pair(guid, service));
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- if (insertPair.second == false)
+ if (!data)
{
- throw DuplicateCompare();
+ // retry detected
+ return;
}
- if (mImpl->mLocatorTopic)
+ try
+ {
+ lg(Debug) << "addCompare(" << guid << ')';
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ pair<map<string, ServiceLocatorParamsComparePrx>::iterator, bool> insertPair =
+ mImpl->mCompares.insert(make_pair(guid, service));
+
+ if (insertPair.second == false)
+ {
+ throw DuplicateCompare();
+ }
+
+ if (mImpl->mLocatorTopic)
+ {
+ mImpl->mLocatorTopic->comparisonRegistered(AsteriskSCF::Operations::createContext(context), guid);
+ }
+ data->setCompleted();
+ }
+ catch(const std::exception& e)
{
- mImpl->mLocatorTopic->comparisonRegistered(guid);
+ data->setException(e);
+ throw;
+ }
+ catch(...)
+ {
+ data->setException();
+ throw;
}
}
/**
* Implementation of the removeCompare method as defined in service_locator.ice
*/
-void ServiceLocatorManagementImpl::removeCompare(const string& guid, const Ice::Current&)
+void ServiceLocatorManagementImpl::removeCompare(const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const string& guid, const Ice::Current&)
{
- lg(Debug) << "removeCompare(" << guid << ')';
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
- std::map<std::string, ServiceLocatorParamsComparePrx>::size_type erased = mImpl->mCompares.erase(guid);
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- if (!erased)
+ if (!data)
{
- throw CompareNotFound();
+ // retry detected
+ return;
}
- else if (mImpl->mLocatorTopic)
+
+ try
+ {
+ lg(Debug) << "removeCompare(" << guid << ')';
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ std::map<std::string, ServiceLocatorParamsComparePrx>::size_type erased = mImpl->mCompares.erase(guid);
+
+ if (!erased)
+ {
+ throw CompareNotFound();
+ }
+ else if (mImpl->mLocatorTopic)
+ {
+ mImpl->mLocatorTopic->comparisonUnregistered(AsteriskSCF::Operations::createContext(context), guid);
+ }
+ data->setCompleted();
+ }
+ catch(const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch(...)
{
- mImpl->mLocatorTopic->comparisonUnregistered(guid);
+ data->setException();
+ throw;
}
}
@@ -612,7 +682,7 @@ void ServiceLocatorManagementImpl::replicateState(const ServiceLocatorStateItemP
try
{
ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
- oneway->setState(items);
+ oneway->setState(AsteriskSCF::Operations::createContext(), items);
}
catch (const Ice::NoEndpointException&)
{
@@ -642,7 +712,7 @@ void ServiceLocatorManagementImpl::removeState(const ServiceLocatorStateItemPtr&
try
{
ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
- oneway->removeState(items);
+ oneway->removeState(AsteriskSCF::Operations::createContext(), items);
}
catch (const Ice::NoEndpointException&)
{
diff --git a/src/ServiceLocatorManagement.h b/src/ServiceLocatorManagement.h
index 468565f..a5aa052 100644
--- a/src/ServiceLocatorManagement.h
+++ b/src/ServiceLocatorManagement.h
@@ -87,15 +87,16 @@ public:
// AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagement interface.
//
AsteriskSCF::Core::Discovery::V1::ServiceManagementPrx addService(
+ const AsteriskSCF::System::V1::OperationContextPtr&,
const Ice::ObjectPrx&, const std::string&, const Ice::Current&);
AsteriskSCF::Core::Discovery::V1::ServiceInfoSeq getServices(
const ::Ice::Current&) const;
AsteriskSCF::Core::Discovery::V1::ServiceInfo getService(
const std::string &, const ::Ice::Current&) const;
- void addCompare(const std::string&,
+ void addCompare(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsComparePrx&,
const Ice::Current&);
- void removeCompare(const std::string&, const Ice::Current& = Ice::Current());
+ void removeCompare(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&, const Ice::Current& = Ice::Current());
void isSupported(const std::string&,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&,
diff --git a/src/ServiceLocatorStateListener.cpp b/src/ServiceLocatorStateListener.cpp
index a549e3b..498d313 100644
--- a/src/ServiceLocatorStateListener.cpp
+++ b/src/ServiceLocatorStateListener.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,16 +14,17 @@
* at the top of the source tree.
*/
-#include <IceUtil/UUID.h>
-
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
-// #include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <IceUtil/UUID.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "ServiceLocatorManagement.h"
#include "ServiceManagement.h"
@@ -33,24 +34,26 @@
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Replication::ServiceLocator::V1;
using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
class ServiceLocatorStateReplicatorItem
{
public:
ServiceLocatorStateReplicatorItem(const ServiceLocatorManagementImplPtr& locatorManagement) :
- mLocatorManagement(locatorManagement)
- {
+ mLocatorManagement(locatorManagement)
+ {
}
~ServiceLocatorStateReplicatorItem()
{
if (mService)
{
- mService->unregister();
+ mService->unregister(AsteriskSCF::Operations::createContext());
}
if (!mComparator.empty())
{
- mLocatorManagement->removeCompare(mComparator);
+ mLocatorManagement->removeCompare(AsteriskSCF::Operations::createContext(), mComparator);
}
}
@@ -88,93 +91,130 @@ private:
struct ServiceLocatorStateReplicatorListenerImpl
{
public:
- ServiceLocatorStateReplicatorListenerImpl(ServiceLocatorManagementImplPtr management)
- : mId(IceUtil::generateUUID()), mLocatorManagement(management) {}
+ ServiceLocatorStateReplicatorListenerImpl(ServiceLocatorManagementImplPtr management) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+ mId(IceUtil::generateUUID()), mLocatorManagement(management) {}
+
void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
{
+ boost::mutex::scoped_lock lock(mMutex);
for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
{
// Just erasing this from the map will cause the destructor to actually shut things down
mStateItems.erase((*key));
}
}
- void setStateNoticeImpl(const ServiceLocatorStateItemSeq& items)
+ void setStateNoticeImpl(const OperationContextPtr& context, const ServiceLocatorStateItemSeq& items)
{
- for (ServiceLocatorStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+ ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+
+ if (!data)
{
- ServiceLocatorServiceStateItemPtr serviceState;
- ServiceLocatorParamsStateItemPtr paramsState;
- ServiceLocatorComparatorStateItemPtr comparatorState;
-
- if ((serviceState = ServiceLocatorServiceStateItemPtr::dynamicCast((*item))))
- {
- std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
- boost::shared_ptr<ServiceLocatorStateReplicatorItem> localitem;
-
- if ((i == mStateItems.end()))
- {
- boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
- localitem = newitem;
- mStateItems.insert(std::make_pair((*item)->key, newitem));
- ServiceManagementImplPtr service = mLocatorManagement->addService(serviceState->service, serviceState->guid, serviceState->managementIdentity);
- newitem->setService(service);
- }
- else
- {
- localitem = i->second;
- }
-
- // The only thing that can be changed by a subsequent state item is the suspend status
- if (serviceState->suspended == true)
- {
- localitem->getService()->suspend();
- }
- else
- {
- localitem->getService()->unsuspend();
- }
- }
- else if ((paramsState = ServiceLocatorParamsStateItemPtr::dynamicCast((*item))))
- {
- // This is special, we have to find the respective service and then add parameters to it
- std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find(paramsState->serviceKey);
-
- if ((i == mStateItems.end()))
- {
- continue;
- }
-
- // Parameters are only ever added, they are never modified or removed
- i->second->getService()->addLocatorParams(paramsState->params, paramsState->compareGuid);
- }
- else if ((comparatorState = ServiceLocatorComparatorStateItemPtr::dynamicCast((*item))))
- {
- std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
-
- if ((i != mStateItems.end()))
- {
- // If this happens we essentially got a duplicate state item for something that should never change, so ignore it
- continue;
- }
-
- try
- {
- Ice::Current current;
- mLocatorManagement->addCompare(comparatorState->name, comparatorState->comparator, current);
- boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
- mStateItems.insert(std::make_pair((*item)->key, newitem));
- newitem->setComparator(comparatorState->name);
- }
- catch (...)
- {
- // It is possible for this to get reached if a comparator exists locally with the same name as the one we just tried to add
- }
- }
+ // retry detected
+ return;
}
+
+ boost::mutex::scoped_lock lock(mMutex);
+
+ try
+ {
+ for (ServiceLocatorStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+ {
+ ServiceLocatorServiceStateItemPtr serviceState;
+ ServiceLocatorParamsStateItemPtr paramsState;
+ ServiceLocatorComparatorStateItemPtr comparatorState;
+
+ if ((serviceState = ServiceLocatorServiceStateItemPtr::dynamicCast((*item))))
+ {
+ std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
+ boost::shared_ptr<ServiceLocatorStateReplicatorItem> localitem;
+
+ if ((i == mStateItems.end()))
+ {
+ boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
+ localitem = newitem;
+ mStateItems.insert(std::make_pair((*item)->key, newitem));
+ ServiceManagementImplPtr service = mLocatorManagement->addService(
+ serviceState->service, serviceState->guid, serviceState->managementIdentity);
+ newitem->setService(service);
+ }
+ else
+ {
+ localitem = i->second;
+ }
+
+ // The only thing that can be changed by a subsequent state item is the suspend status
+ if (serviceState->suspended == true)
+ {
+ localitem->getService()->suspend(AsteriskSCF::Operations::createContext(context));
+ }
+ else
+ {
+ localitem->getService()->unsuspend(AsteriskSCF::Operations::createContext(context));
+ }
+ }
+ else if ((paramsState = ServiceLocatorParamsStateItemPtr::dynamicCast((*item))))
+ {
+ // This is special, we have to find the respective service and then add parameters to it
+ std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find(paramsState->serviceKey);
+
+ if ((i == mStateItems.end()))
+ {
+ continue;
+ }
+
+ // Parameters are only ever added, they are never modified or removed
+ i->second->getService()->addLocatorParams(AsteriskSCF::Operations::createContext(context),
+ paramsState->params, paramsState->compareGuid);
+ }
+ else if ((comparatorState = ServiceLocatorComparatorStateItemPtr::dynamicCast((*item))))
+ {
+ std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
+
+ if ((i != mStateItems.end()))
+ {
+ // If this happens we essentially got a duplicate state item for something that should never change, so ignore it
+ continue;
+ }
+
+ try
+ {
+ Ice::Current current;
+ mLocatorManagement->addCompare(AsteriskSCF::Operations::createContext(context), comparatorState->name, comparatorState->comparator, current);
+ boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
+ mStateItems.insert(std::make_pair((*item)->key, newitem));
+ newitem->setComparator(comparatorState->name);
+ }
+ catch (...)
+ {
+ // It is possible for this to get reached if a comparator exists locally with the same name as the one we just tried to add
+ }
+ }
+ }
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ assert(false);
+ data->setException();
+ throw;
+ }
+
+ data->setCompleted();
}
- std::string mId;
+
+ const std::string& getId() const { return mId; }
+
+private:
+ boost::mutex mMutex;
+ OperationContextCachePtr mOperationContextCache;
+ const std::string mId;
std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> > mStateItems;
- ServiceLocatorManagementImplPtr mLocatorManagement;
+ const ServiceLocatorManagementImplPtr mLocatorManagement;
};
ServiceLocatorStateReplicatorListenerI::ServiceLocatorStateReplicatorListenerI(const ServiceLocatorManagementImplPtr& management)
@@ -182,17 +222,18 @@ ServiceLocatorStateReplicatorListenerI::ServiceLocatorStateReplicatorListenerI(c
{
}
-void ServiceLocatorStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+void ServiceLocatorStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq& itemKeys, const Ice::Current&)
{
+ // naturally idempotent
mImpl->removeStateNoticeImpl(itemKeys);
}
-void ServiceLocatorStateReplicatorListenerI::stateSet(const ServiceLocatorStateItemSeq& items, const Ice::Current&)
+void ServiceLocatorStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const ServiceLocatorStateItemSeq& items, const Ice::Current&)
{
- mImpl->setStateNoticeImpl(items);
+ mImpl->setStateNoticeImpl(context, items);
}
bool ServiceLocatorStateReplicatorListenerI::operator==(const ServiceLocatorStateReplicatorListenerI &rhs)
{
- return mImpl->mId == rhs.mImpl->mId;
+ return mImpl->getId() == rhs.mImpl->getId();
}
diff --git a/src/ServiceLocatorStateReplicator.h b/src/ServiceLocatorStateReplicator.h
index 8b64b67..c24dd2e 100644
--- a/src/ServiceLocatorStateReplicator.h
+++ b/src/ServiceLocatorStateReplicator.h
@@ -41,8 +41,8 @@ class ServiceLocatorStateReplicatorListenerI :
{
public:
ServiceLocatorStateReplicatorListenerI(const AsteriskSCF::ServiceDiscovery::ServiceLocatorManagementImplPtr&);
- void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
- void stateSet(const AsteriskSCF::Replication::ServiceLocator::V1::ServiceLocatorStateItemSeq&, const Ice::Current&);
+ void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::ServiceLocator::V1::ServiceLocatorStateItemSeq&, const Ice::Current&);
bool operator==(const ServiceLocatorStateReplicatorListenerI& rhs);
private:
boost::shared_ptr<ServiceLocatorStateReplicatorListenerImpl> mImpl;
diff --git a/src/ServiceLocatorStateReplicatorApp.cpp b/src/ServiceLocatorStateReplicatorApp.cpp
index ecc505f..3035efc 100644
--- a/src/ServiceLocatorStateReplicatorApp.cpp
+++ b/src/ServiceLocatorStateReplicatorApp.cpp
@@ -14,6 +14,11 @@
* at the top of the source tree.
*/
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
#include <IceStorm/IceStorm.h>
@@ -65,17 +70,17 @@ public:
ComponentServiceImpl(ServiceLocatorStateReplicatorService &service) : mService(service) {}
public: // Overrides of the ComponentService interface.
- virtual void suspend(const ::Ice::Current& = ::Ice::Current())
+ virtual void suspend(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
{
// TBD
}
- virtual void resume(const ::Ice::Current& = ::Ice::Current())
+ virtual void resume(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
{
// TBD
}
- virtual void shutdown(const ::Ice::Current& = ::Ice::Current())
+ virtual void shutdown(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
{
// TBD
}
diff --git a/src/ServiceManagement.cpp b/src/ServiceManagement.cpp
index fac2e48..19bc1b7 100644
--- a/src/ServiceManagement.cpp
+++ b/src/ServiceManagement.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,29 +14,38 @@
* at the top of the source tree.
*/
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
-#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
#include <boost/shared_ptr.hpp>
-#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Async/ResponseCollector.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Async/ResponseCollector.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "ServiceLocatorStateReplicationIf.h"
#include "ServiceLocatorManagement.h"
#include "ServiceManagement.h"
-using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
using namespace AsteriskSCF::Replication::ServiceLocator::V1;
using namespace AsteriskSCF::ServiceDiscovery;
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF;
+using namespace std;
namespace
{
@@ -101,6 +110,7 @@ public:
ServiceManagementImplPriv(ServiceManagementImpl* impl, ServiceLocatorManagementImplPtr management,
const Ice::ObjectPrx& service, const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic,
const string& guid, const Ice::Identity& identity) :
+ mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mStateItem(new ServiceLocatorServiceStateItem()),
mManagement(management), mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic)
{
@@ -112,7 +122,7 @@ public:
mStateItem->managementIdentity = identity;
if (mLocatorTopic)
{
- mLocatorTopic->serviceRegistered(guid);
+ mLocatorTopic->serviceRegistered(AsteriskSCF::Operations::createContext(), guid);
}
mManagement->replicateState(mStateItem);
}
@@ -122,6 +132,8 @@ public:
*/
boost::shared_mutex mLock;
+ OperationContextCachePtr mOperationContextCache;
+
/**
* Service state replication item.
*/
@@ -275,8 +287,8 @@ void ServiceManagementImpl::isSupported(const ServiceLocatorParamsPtr& params, c
{
if ((*spec)->isSupported(params, myCallback))
{
- // If we get here, a match was found without needing to call an external comparator.
- // We are done.
+ // If we get here, a match was found without needing to call an external comparator.
+ // We are done.
break;
}
}
@@ -295,13 +307,13 @@ const std::string& ServiceManagementImpl::getGuid() const
* that is trying to be found.
* @param callback Callback to asynchronously rx the results.
*
- * @return True only if we determined a match without any need for calling a custom comparator.
- * Otherwise, the callback may be accumulating the results, or it's not a match.
+ * @return True only if we determined a match without any need for calling a custom comparator.
+ * Otherwise, the callback may be accumulating the results, or it's not a match.
*/
bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params, const IsSupportedCallbackPtr& callback)
{
- // Does the component doing the locate
- // want to filter based on category.
+ // Does the component doing the locate
+ // want to filter based on category.
if (!params->category.empty())
{
// Is this the wrong category?
@@ -314,18 +326,18 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
}
}
- // Does the component doing the locate
- // want all services in the category?
+ // Does the component doing the locate
+ // want all services in the category?
if (params->service.empty())
{
- // If a comparator was provided then yield to it.
+ // If a comparator was provided then yield to it.
if (!mStateItem->compareGuid.empty())
{
mManagement->isSupported(mStateItem->compareGuid, params, callback);
return false;
}
- // Ignore the id and treat this as a wildcard search.
+ // Ignore the id and treat this as a wildcard search.
lg(Trace) << " ...isSupported" << debugPrintParams(params) + " = true. Category match explicit, wildcard match service.";
callback->result(true);
@@ -352,15 +364,15 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
}
}
- // If a comparator was provided then yield to it.
+ // If a comparator was provided then yield to it.
if (!mStateItem->compareGuid.empty())
{
mManagement->isSupported(mStateItem->compareGuid, params, callback);
return false;
}
- // If we get here we have a match on service and id.
- // (and category, if one was passed in.)
+ // If we get here we have a match on service and id.
+ // (and category, if one was passed in.)
lg(Trace) << " ...isSupported" << debugPrintParams(params) + " = true";
callback->result(true);
return true;
@@ -369,8 +381,16 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
/**
* Implementation of the addLocatorParams method as defined in service_locator.ice
*/
-void ServiceManagementImpl::addLocatorParams(const ServiceLocatorParamsPtr& params, const std::string& compareGuid, const Ice::Current&)
+void ServiceManagementImpl::addLocatorParams(
+ const AsteriskSCF::System::V1::OperationContextPtr& context,
+ const ServiceLocatorParamsPtr& params, const std::string& compareGuid, const Ice::Current&)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(context))
+ {
+ // retry detected
+ return;
+ }
+
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
boost::shared_ptr<ServiceLocatorParamsSpec> spec(new ServiceLocatorParamsSpec(params, compareGuid, mImpl->mManagement, mImpl->mStateItem));
@@ -381,41 +401,87 @@ void ServiceManagementImpl::addLocatorParams(const ServiceLocatorParamsPtr& para
/**
* Implementation of the suspend method as defined in service_locator.ice
*/
-void ServiceManagementImpl::suspend(const Ice::Current&)
+void ServiceManagementImpl::suspend(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
{
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- if (!mImpl->mStateItem->suspended)
+ if (!data)
{
- lg(Info) << "Suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
- mImpl->mStateItem->suspended = true;
- mImpl->mManagement->replicateState(mImpl->mStateItem);
+ // retry detected
+ return;
}
- if (mImpl->mLocatorTopic)
+ try
{
- mImpl->mLocatorTopic->serviceSuspended(mImpl->mStateItem->guid);
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ if (!mImpl->mStateItem->suspended)
+ {
+ lg(Info) << "Suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
+ mImpl->mStateItem->suspended = true;
+ mImpl->mManagement->replicateState(mImpl->mStateItem);
+ }
+
+ if (mImpl->mLocatorTopic)
+ {
+ mImpl->mLocatorTopic->serviceSuspended(AsteriskSCF::Operations::createContext(context), mImpl->mStateItem->guid);
+ }
+ data->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ data->setException(e);
+ throw;
+ }
+ catch (...)
+ {
+ assert(false);
+ data->setException();
+ throw;
}
}
/**
* Implementation of the unsuspend method as defined in service_locator.ice
*/
-void ServiceManagementImpl::unsuspend(const Ice::Current&)
+void ServiceManagementImpl::unsuspend(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
{
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
- if (mImpl->mStateItem->suspended)
+ if (!data)
{
- lg(Info) << "Un-suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
- mImpl->mStateItem->suspended = false;
- mImpl->mManagement->replicateState(mImpl->mStateItem);
+ // retry detected
+ return;
}
- if (mImpl->mLocatorTopic)
+ try
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ if (mImpl->mStateItem->suspended)
+ {
+ lg(Info) << "Un-suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
+ mImpl->mStateItem->suspended = false;
+ mImpl->mManagement->replicateState(mImpl->mStateItem);
+ }
+
+ if (mImpl->mLocatorTopic)
+ {
+ mImpl->mLocatorTopic->serviceUnsuspended(context, mImpl->mStateItem->guid);
+ }
+ }
+ catch (const std::exception& e)
{
- mImpl->mLocatorTopic->serviceUnsuspended(mImpl->mStateItem->guid);
+ data->setException(e);
+ throw;
}
+ catch (...)
+ {
+ assert(false);
+ data->setException();
+ throw;
+ }
+ data->setCompleted();
}
ServiceStatus ServiceManagementImpl::getStatus() const
@@ -433,7 +499,6 @@ ServiceStatus ServiceManagementImpl::getStatus() const
ServiceLocatorParamsSeq ServiceManagementImpl::getLocatorParams(const Ice::Current&)
{
-
boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
ServiceLocatorParamsSeq result;
for (std::vector<boost::shared_ptr<ServiceLocatorParamsSpec> >::const_iterator iter = mImpl->mSupportedLocatorParams.begin();
@@ -447,8 +512,16 @@ ServiceLocatorParamsSeq ServiceManagementImpl::getLocatorParams(const Ice::Curre
/**
* Implementation of the unregister method as defined in service_locator.ice
*/
-void ServiceManagementImpl::unregister(const Ice::Current&)
+void ServiceManagementImpl::unregister(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
{
+ ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
+
+ if (!data)
+ {
+ // retry detected
+ return;
+ }
+
/* You'll notice no lock here. That's because we aren't actually modifying any internal state that should
* be protected, and if we did lock here there is a chance for a deadlock which is super sad.
*/
@@ -464,19 +537,21 @@ void ServiceManagementImpl::unregister(const Ice::Current&)
if (mImpl->mLocatorTopic)
{
- mImpl->mLocatorTopic->serviceUnregistered(mImpl->mStateItem->guid);
+ mImpl->mLocatorTopic->serviceUnregistered(AsteriskSCF::Operations::createContext(context), mImpl->mStateItem->guid);
}
+
+ data->setCompleted();
}
catch(const std::exception& e)
{
lg(Error) << BOOST_CURRENT_FUNCTION << " : " << e.what();
+ data->setException(e);
throw;
}
catch(...)
{
lg(Error) << BOOST_CURRENT_FUNCTION << " : " << "Unknown exception.";
+ data->setException();
throw;
}
}
-
-
diff --git a/src/ServiceManagement.h b/src/ServiceManagement.h
index a1966d7..e8f0907 100644
--- a/src/ServiceManagement.h
+++ b/src/ServiceManagement.h
@@ -39,10 +39,11 @@ public:
//
// AsteriskSCF::Core::Discovery::V1::ServiceManagement interface.
//
- void addLocatorParams(const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&, const std::string&, const Ice::Current& = Ice::Current());
- void suspend(const Ice::Current& = Ice::Current());
- void unsuspend(const Ice::Current& = Ice::Current());
- void unregister(const Ice::Current& = Ice::Current());
+ void addLocatorParams(const AsteriskSCF::System::V1::OperationContextPtr&,
+ const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&, const std::string&, const Ice::Current& = Ice::Current());
+ void suspend(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
+ void unsuspend(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
+ void unregister(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
AsteriskSCF::Core::Discovery::V1::ServiceStatus getStatus(const Ice::Current&) const { return getStatus(); }
AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsSeq getLocatorParams(const Ice::Current&);
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index bbfa333..1b08f67 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,3 +1,4 @@
+include_directories(${astscf-ice-util-cpp_dir}/include)
astscf_component_init(service_locator_test)
astscf_component_add_files(service_locator_test TestServiceLocator.cpp)
astscf_component_add_files(service_locator_test TestComparatorBlocking.cpp)
@@ -6,5 +7,6 @@ astscf_component_add_boost_libraries(service_locator_test unit_test_framework th
astscf_component_add_slice_collection_libraries(service_locator_test ASTSCF)
astscf_component_build_icebox(service_locator_test)
astscf_test_icebox(service_locator_test config/test_component.conf)
+target_link_libraries(service_locator_test LoggingClient ASTSCFIceUtilCpp)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/AsteriskSCFIceStorm)
diff --git a/test/TestComparatorBlocking.cpp b/test/TestComparatorBlocking.cpp
index 7272313..234180b 100644
--- a/test/TestComparatorBlocking.cpp
+++ b/test/TestComparatorBlocking.cpp
@@ -34,6 +34,7 @@
#include <Ice/Ice.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
using namespace AsteriskSCF::Core::Discovery::V1;
@@ -149,7 +150,7 @@ public:
discovery(ServiceLocatorPrx::checkedCast(
discoveryCommunicator->stringToProxy(
"LocatorService:tcp -h 127.0.0.1 -p 4411"))),
- proxy(management->addService(management, "self")),
+ proxy(management->addService(AsteriskSCF::Operations::createContext(), management, "self")),
params(new ServiceLocatorParams())
{
BOOST_REQUIRE(blockerProxy != 0);
@@ -157,17 +158,17 @@ public:
BOOST_REQUIRE(discovery != 0);
blockerAdapter->activate();
- management->addCompare("blocker", blockerProxy);
+ management->addCompare(AsteriskSCF::Operations::createContext(), "blocker", blockerProxy);
params->category = "self";
- proxy->addLocatorParams(params, "blocker");
+ proxy->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "blocker");
}
~Fixture()
{
blocker->unblock();
- proxy->unregister();
- management->removeCompare("blocker");
+ proxy->unregister(AsteriskSCF::Operations::createContext());
+ management->removeCompare(AsteriskSCF::Operations::createContext(), "blocker");
blockerCommunicator->shutdown();
blockerCommunicator->waitForShutdown();
diff --git a/test/TestServiceLocator.cpp b/test/TestServiceLocator.cpp
index 6c698d0..aebc687 100644
--- a/test/TestServiceLocator.cpp
+++ b/test/TestServiceLocator.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -26,6 +26,8 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
@@ -102,7 +104,7 @@ public:
params->category = category;
params->service = service;
- // Actually do the locate request
+ // Actually do the locate request
foundCompare = ServiceLocatorParamsComparePrx::uncheckedCast(discovery->locate(params));
/* If we get here we didn't get an exception back telling us no service found... which is wrong! */
@@ -111,7 +113,7 @@ public:
{
found = false;
}
- catch (const Ice::Exception &e)
+ catch (const Ice::Exception& e)
{
BOOST_TEST_MESSAGE(e.ice_name());
BOOST_TEST_MESSAGE(e.what());
@@ -198,7 +200,7 @@ struct GlobalIceFixture
{
throw "Invalid service discovery proxy";
}
-
+
} // end Fixture() constructor
~GlobalIceFixture()
@@ -290,7 +292,8 @@ BOOST_AUTO_TEST_CASE(AddService)
*/
try
{
- testbed.compareManagement = testbed.management->addService(testbed.compare, "testcompare");
+ testbed.compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+ testbed.compare, "testcompare");
added = true;
}
catch (const Ice::Exception &e)
@@ -306,6 +309,19 @@ BOOST_AUTO_TEST_CASE(AddService)
}
/**
+ * Test retry logic for adding a service
+ */
+BOOST_AUTO_TEST_CASE(AddServiceRetry)
+{
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ ServiceManagementPrx expected = testbed.management->addService(context, testbed.compare, "testcompare");
+ ServiceManagementPrx actual = testbed.management->addService(context, testbed.compare, "testcompare");
+
+ BOOST_CHECK_EQUAL(expected, actual);
+}
+
+/**
* Confirm that we can't find a service after we have added one but before we add parameters.
*/
BOOST_AUTO_TEST_CASE(ServiceNotFoundAfterAdd)
@@ -338,7 +354,7 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithoutCompareService)
params->category = "test";
params->service = "default";
- testbed.compareManagement->addLocatorParams(params, "");
+ testbed.compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
added = true;
}
@@ -354,6 +370,26 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithoutCompareService)
BOOST_CHECK(added);
}
+BOOST_AUTO_TEST_CASE(AddLocatorParamsRetry)
+{
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+ params->category = "retry-test";
+ params->service = "default";
+
+ size_t originalLocatorCount = testbed.compareManagement->getLocatorParams().size();
+
+ testbed.compareManagement->addLocatorParams(context, params, "");
+ testbed.compareManagement->addLocatorParams(context, params, "");
+
+ size_t expected = originalLocatorCount + 1;
+ size_t actual = testbed.compareManagement->getLocatorParams().size();
+
+ BOOST_CHECK_EQUAL(expected, actual);
+}
+
+
/**
* Confirm that we can find a service after we add discovery params that do not use a compare service.
*/
@@ -412,7 +448,7 @@ BOOST_AUTO_TEST_CASE(AddCompare)
try
{
- testbed.management->addCompare("testcompare", testbed.compare);
+ testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompare", testbed.compare);
added = true;
}
@@ -440,12 +476,13 @@ BOOST_AUTO_TEST_CASE(AddDuplicateCompare)
try
{
- testbed.management->addCompare("testcompare", testbed.compare);
+ testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompare", testbed.compare);
added = true;
}
catch (const DuplicateCompare&)
{
+ // expected
}
catch (const Ice::Exception &e)
{
@@ -460,6 +497,46 @@ BOOST_AUTO_TEST_CASE(AddDuplicateCompare)
}
/**
+ * Tests retry logic in addCompare.
+ */
+BOOST_AUTO_TEST_CASE(AddCompareRetry)
+{
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+ testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+ BOOST_CHECK_NO_THROW(testbed.management->addCompare(context, "testcompareretry", testbed.compare));
+}
+
+/**
+ * Tests retry logic in addCompare.
+ */
+BOOST_AUTO_TEST_CASE(AddCompareDuplicateRetry)
+{
+ testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompareduplicateretry", testbed.compare);
+
+ AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+ try
+ {
+ testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+ BOOST_FAIL("Expected exception");
+ }
+ catch (const DuplicateCompare&)
+ {
+ // expected
+ }
+
+ try
+ {
+ testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+ BOOST_FAIL("Expected exception");
+ }
+ catch (const DuplicateCompare&)
+ {
+ // expected
+ }
+}
+
+/**
* Confirm that we can add discovery parameters with a compare service.
*/
BOOST_AUTO_TEST_CASE(AddLocatorParamsWithCompareService)
@@ -472,7 +549,7 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithCompareService)
params->category = "test2";
params->service = "default";
- testbed.compareManagement->addLocatorParams(params, "testcompare");
+ testbed.compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "testcompare");
added = true;
}
@@ -533,16 +610,17 @@ BOOST_AUTO_TEST_CASE(UseServiceFoundWithCompareService)
BOOST_AUTO_TEST_CASE(FindMultipleServices)
{
ServiceLocatorParamsPtr params = new ServiceLocatorParams;
- ServiceManagementPrx compareManagement = testbed.management->addService(testbed.compare, "testcompare2");
+ ServiceManagementPrx compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+ testbed.compare, "testcompare2");
params->category = "test";
params->service = "default";
- compareManagement->addLocatorParams(params, "");
+ compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
bool found = testbed.findServices("test", "", 2);
- compareManagement->unregister();
+ compareManagement->unregister(AsteriskSCF::Operations::createContext());
BOOST_CHECK(found);
}
@@ -553,16 +631,17 @@ BOOST_AUTO_TEST_CASE(FindMultipleServices)
BOOST_AUTO_TEST_CASE(FindMultipleServicesUsingEmptyCategory)
{
ServiceLocatorParamsPtr params = new ServiceLocatorParams;
- ServiceManagementPrx compareManagement = testbed.management->addService(testbed.compare, "testcompare2");
+ ServiceManagementPrx compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+ testbed.compare, "testcompare2");
params->category = "test";
- compareManagement->addLocatorParams(params, "");
+ compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
// This takes into account that the service locator internally publishes an IceStorm topic manager service
bool found = testbed.findServices("", "", 3);
- compareManagement->unregister();
+ compareManagement->unregister(AsteriskSCF::Operations::createContext());
BOOST_CHECK(found);
}
@@ -576,7 +655,7 @@ BOOST_AUTO_TEST_CASE(ServiceSuspend)
try
{
- testbed.compareManagement->suspend();
+ testbed.compareManagement->suspend(AsteriskSCF::Operations::createContext());
suspended = true;
}
catch (const Ice::Exception &e)
@@ -610,7 +689,7 @@ BOOST_AUTO_TEST_CASE(ServiceUnsuspend)
try
{
- testbed.compareManagement->unsuspend();
+ testbed.compareManagement->unsuspend(AsteriskSCF::Operations::createContext());
unsuspended = true;
}
catch (const Ice::Exception &e)
@@ -644,7 +723,7 @@ BOOST_AUTO_TEST_CASE(RemoveNonexistentCompareService)
try
{
- testbed.management->removeCompare("testcompare2");
+ testbed.management->removeCompare(AsteriskSCF::Operations::createContext(), "testcompare2");
removed = true;
... 97 lines suppressed ...
--
asterisk-scf/release/servicediscovery.git
More information about the asterisk-scf-commits
mailing list