[asterisk-scf-commits] asterisk-scf/release/servicediscovery.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 8 17:29:13 CDT 2012


branch "master" has been updated
       via  f8976984cbb446e42c72161be404aee13380a2bb (commit)
      from  ab11dc1d69e959ef7966bb359433060723b77c27 (commit)

Summary of changes:
 config/test_component.conf                         |    4 +-
 .../ServiceLocatorStateReplicationIf.ice           |  100 +++---
 src/ServiceLocator.cpp                             |  343 ++++++++++++++++----
 src/ServiceLocatorManagement.cpp                   |  130 ++++++--
 src/ServiceLocatorManagement.h                     |    5 +-
 src/ServiceLocatorStateListener.cpp                |  215 ++++++++-----
 src/ServiceLocatorStateReplicator.h                |    4 +-
 src/ServiceLocatorStateReplicatorApp.cpp           |   11 +-
 src/ServiceManagement.cpp                          |  159 +++++++---
 src/ServiceManagement.h                            |    9 +-
 test/CMakeLists.txt                                |    2 +
 test/TestComparatorBlocking.cpp                    |   11 +-
 test/TestServiceLocator.cpp                        |  168 +++++++++--
 13 files changed, 853 insertions(+), 308 deletions(-)


- Log -----------------------------------------------------------------
commit f8976984cbb446e42c72161be404aee13380a2bb
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue May 8 12:06:58 2012 -0500

    Changes for new retry logic.

diff --git a/config/test_component.conf b/config/test_component.conf
index 7ea0784..c4a3217 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -3,7 +3,7 @@
 #
 # Ice configuration
 #
-
+Ice.ThreadPool.Client.Size=4
 # Collocation is incompatible with AMI/AMD which sharing a communicator
 Ice.Default.CollocationOptimized=0
 
@@ -38,7 +38,7 @@ ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 ServiceDiscovery.StateReplicator.Proxy=ServiceLocatorStateReplicatorService:tcp -h 127.0.0.1  -p 4413
 
 # Configure ourselves as a master
-ServiceDiscovery.Standalone=no
+ServiceDiscovery.Standalone=yes
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1  -p 4421
 ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1  -p 4422:udp -h 127.0.0.1  -p 4422
diff --git a/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice b/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
index 3f509a9..65c0420 100644
--- a/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/ServiceLocator/ServiceLocatorStateReplicationIf.ice
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -19,6 +19,7 @@
 #include <Ice/BuiltinSequences.ice>
 #include <Ice/Identity.ice>
 #include "AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice"
+#include <AsteriskSCF/System/OperationsIf.ice>
 
 module AsteriskSCF
 {
@@ -32,52 +33,57 @@ module ServiceLocator
 ["suppress"]
 module V1
 {
-   const string StateReplicatorComponentCategory = "ServiceLocatorStateReplicatorComponent";
-   const string StateReplicatorDiscoveryCategory = "ServiceLocatorStateReplicator";
-
-   class ServiceLocatorStateItem
-   {
-      string key;
-   };
-
-   sequence<ServiceLocatorStateItem> ServiceLocatorStateItemSeq;
-
-   interface ServiceLocatorStateReplicatorListener
-   {
-	   void stateRemoved(Ice::StringSeq itemKeys);
-	   void stateSet(ServiceLocatorStateItemSeq items);
-   };
-
-   interface ServiceLocatorStateReplicator
-   {
-	   void addListener(ServiceLocatorStateReplicatorListener *listener);
-	   void removeListener(ServiceLocatorStateReplicatorListener *listener);
-	   void setState (ServiceLocatorStateItemSeq items);
-	   void removeState(Ice::StringSeq items);
-	   idempotent ServiceLocatorStateItemSeq getState(Ice::StringSeq itemKeys);
-	   idempotent ServiceLocatorStateItemSeq getAllState();
-   };
-
-   class ServiceLocatorServiceStateItem extends ServiceLocatorStateItem
-   {
-       bool suspended;
-       Object *service;
-       Ice::Identity managementIdentity;
-       string guid;
-   };
-
-   class ServiceLocatorParamsStateItem extends ServiceLocatorStateItem
-   {
-       string serviceKey;
-       AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams params;
-       string compareGuid;
-   };
-
-   class ServiceLocatorComparatorStateItem extends ServiceLocatorStateItem
-   {
-       string name;
-       AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsCompare *comparator;
-   };
+
+const string StateReplicatorComponentCategory = "ServiceLocatorStateReplicatorComponent";
+const string StateReplicatorDiscoveryCategory = "ServiceLocatorStateReplicator";
+
+class ServiceLocatorStateItem
+{
+    string key;
+};
+
+sequence<ServiceLocatorStateItem> ServiceLocatorStateItemSeq;
+
+interface ServiceLocatorStateReplicatorListener
+{
+    idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+    idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext,
+        ServiceLocatorStateItemSeq items);
+};
+
+interface ServiceLocatorStateReplicator
+{
+    idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext,
+        ServiceLocatorStateReplicatorListener *listener);
+    idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext,
+        ServiceLocatorStateReplicatorListener *listener);
+    idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext,
+        ServiceLocatorStateItemSeq items);
+    idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+    idempotent ServiceLocatorStateItemSeq getState(Ice::StringSeq itemKeys);
+    idempotent ServiceLocatorStateItemSeq getAllState();
+};
+
+class ServiceLocatorServiceStateItem extends ServiceLocatorStateItem
+{
+    bool suspended;
+    Object *service;
+    Ice::Identity managementIdentity;
+    string guid;
+};
+
+class ServiceLocatorParamsStateItem extends ServiceLocatorStateItem
+{
+    string serviceKey;
+    AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams params;
+    string compareGuid;
+};
+
+class ServiceLocatorComparatorStateItem extends ServiceLocatorStateItem
+{
+    string name;
+    AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsCompare *comparator;
+};
 
 }; /* module V1 */
 
diff --git a/src/ServiceLocator.cpp b/src/ServiceLocator.cpp
index 014805d..2a647ed 100644
--- a/src/ServiceLocator.cpp
+++ b/src/ServiceLocator.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,6 +14,11 @@
  * at the top of the source tree.
  */
 
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+
 #include <Ice/Ice.h>
 #include <IceStorm/IceStorm.h>
 #include <IceBox/IceBox.h>
@@ -25,19 +30,24 @@
 #include <AsteriskSCF/Logger/IceLogger.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 #include <AsteriskSCF/CollocatedIceStorm/CollocatedIceStorm.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "ServiceLocatorManagement.h"
 #include "ServiceManagement.h"
 #include "ServiceLocatorStateReplicator.h"
 
-using namespace std;
-using namespace AsteriskSCF;
-using namespace AsteriskSCF::System::Discovery;
-using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::ServiceLocator::V1;
 using namespace AsteriskSCF::ServiceDiscovery;
 using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Discovery;
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF;
+using namespace std;
 
 namespace
 {
@@ -55,14 +65,26 @@ public:
         const Ice::CommunicatorPtr& communicator,
         const Ice::StringSeq& args);
     void stop();
+    void onStandby();
+    void activated();
 
 private:
+    void verifyProperties();
+    
     Ice::ObjectAdapterPtr mLocalAdapter;
     Ice::ObjectAdapterPtr mDiscoveryAdapter;
     Ice::ObjectAdapterPtr mManagementAdapter;
     AsteriskSCF::CollocatedIceStorm::CollocatedIceStormPtr mIceStorm;
     ReplicaPtr mReplicaService;
     ServiceLocatorStateReplicatorPrx mStateReplicator;
+    ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+    IceStorm::TopicManagerPrx mTopicManager;
+    ServiceLocatorStateReplicatorListenerPrx mReplicatorListenerProxy;
+    bool mStandalone;
+    Ice::CommunicatorPtr mCommunicator;
+    string mBackplaneAdapterName;
+    string mDiscoveryAdapterName;
+    string mManagementAdapterName;
 };
 
 /**
@@ -71,53 +93,160 @@ private:
 class ReplicaImpl : public Replica
 {
 public:
-    ReplicaImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter), mPaused(false), mActive(true) { }
+    ReplicaImpl(Ice::ObjectAdapterPtr adapter, bool active) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+        mAdapter(adapter),
+        mActive(active),
+        mServiceLocatorApp(0) { }
 
     bool isActive(const Ice::Current&)
     {
+        // naturally idempotent
+        boost::shared_lock<boost::shared_mutex> lock(mMutex);
         return mActive;
     }
 
-    bool activate(const Ice::Current&)
+    bool activate(const OperationContextPtr& context, const Ice::Current&)
     {
-        mActive = true;
+        ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
 
-        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        if (!data)
+        {
+            // retry detected
+            return true; // always returns true.
+        }
+
+        try
+        {
+            lg(Info) << "ServiceLocator activate() received...";
+
+            // it's not good to loop through the listeners while holding the lock, so make a copy
+            vector<ReplicaListenerPrx> listeners;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mMutex);
+                mActive = true;
+                listeners = mListeners;
+            }
+
+            if (mServiceLocatorApp)
+            {
+                lg(Info) << "ServiceLocator activate() calling the app's callback...";
+                mServiceLocatorApp->activated();
+            }
+
+            lg(Info) << "ServiceLocator activate() notifying listeners...";
+            for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
+                 listener != listeners.end();
+                 ++listener)
+            {
+                (*listener)->activated(AsteriskSCF::Operations::createContext(context),
+                    ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+            }
+
+            lg(Info) << "ServiceLocator activated.";
+        }
+        catch (const std::exception& e)
+        {
+            data->setException(e);
+            throw;
+        }
+        catch (...)
         {
-            (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+            data->setException();
+            throw;
         }
 
+        data->setCompleted();
+
         return true;
     }
 
-    void standby(const Ice::Current&)
+    void standby(const OperationContextPtr& context, const Ice::Current&)
     {
-        mActive = false;
+        ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
 
-        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        if (!data)
         {
-            (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+            // retry detected
+            return;
+        }
+
+        try
+        {
+            // it's not good to loop through the listeners while holding the lock, so make a copy
+            vector<ReplicaListenerPrx> listeners;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mMutex);
+                mActive = false;
+                listeners = mListeners;
+            }
+
+            if (mServiceLocatorApp)
+            {
+                mServiceLocatorApp->onStandby();
+            }
+
+            for (vector<ReplicaListenerPrx>::const_iterator listener = listeners.begin();
+                 listener != listeners.end();
+                 ++listener)
+            {
+                (*listener)->onStandby(AsteriskSCF::Operations::createContext(context),
+                    ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+            }
         }
+        catch (const std::exception& e)
+        {
+            data->setException(e);
+            throw;
+        }
+        catch (...)
+        {
+            data->setException();
+            throw;
+        }
+
+         data->setCompleted();
     }
 
-    void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    void addListener(const OperationContextPtr& context, const ReplicaListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
         mListeners.push_back(listener);
     }
 
-    void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    void removeListener(const OperationContextPtr& context, const ReplicaListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
+        boost::unique_lock<boost::shared_mutex> lock(mMutex);
         mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
     }
 
+    void setServiceLocatorApp(ServiceLocatorApp* serviceLocatorApp)
+    {
+        mServiceLocatorApp = serviceLocatorApp;
+    }
+
 private:
-    Ice::ObjectAdapterPtr mAdapter;
+    boost::shared_mutex mMutex;
 
-    vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+    OperationContextCachePtr mOperationContextCache;
 
-    bool mPaused;
+    Ice::ObjectAdapterPtr mAdapter;
+
+    vector<ReplicaListenerPrx> mListeners;
 
     bool mActive;
+
+    ServiceLocatorApp* mServiceLocatorApp; // Plain pointer to avoid smart pointer circular refs. 
 };
 
 /**
@@ -140,13 +269,14 @@ public:
      */
     void locateAll_async(const AMD_ServiceLocator_locateAllPtr&,
         const ServiceLocatorParamsPtr&, const ::Ice::Current&);
+
 private:
     /**
      * A pointer to the ServiceManagement implementation as that is where everything is
      * actually stored. Our ServiceLocator implementation simply acts as a read-only
      *  frontend to it.
      */
-    ServiceLocatorManagementImplPtr mLocatorServiceManagement;
+    const ServiceLocatorManagementImplPtr mLocatorServiceManagement;
 };
 
 }
@@ -156,6 +286,7 @@ void ServiceLocatorImpl::locate_async(const AMD_ServiceLocator_locatePtr& cb,
     const ::Ice::Current&)
 {
     // delegate to the management object, where the data is kept
+    // mLocatorServiceManagement is const; no lock needed
     mLocatorServiceManagement->locate(cb, params);
 }
 
@@ -164,17 +295,68 @@ void ServiceLocatorImpl::locateAll_async(const AMD_ServiceLocator_locateAllPtr&
     const ::Ice::Current&)
 {
     // delegate to the management object, where the data is kept
+    // mLocatorServiceManagement is const; no lock needed
     mLocatorServiceManagement->locateAll(cb, params);
 }
 
+void ServiceLocatorApp::verifyProperties()
+{
+    const int defaultSize(4);
+    string strDefaultSize = boost::lexical_cast<std::string>(defaultSize);
+
+    Ice::Int defaultPoolSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0);
+    
+    // NOTE: The ServiceLocator, unlike other components, uses two adapters for its primary services: management, and discovery. 
+
+    if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mDiscoveryAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+    {
+        if (defaultPoolSize < defaultSize)
+        {
+            lg(Info) << "Configured thread pool size for " << mDiscoveryAdapterName + " is too small, defaulting to " << strDefaultSize;
+            mCommunicator->getProperties()->setProperty(mDiscoveryAdapterName + ".ThreadPool.Size", strDefaultSize);
+        }
+    }
+
+    if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mManagementAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+    {
+        if (defaultPoolSize < defaultSize)
+        {
+            lg(Info) << "Configured thread pool size for " << mManagementAdapterName + " is too small, defaulting to " << strDefaultSize;
+            mCommunicator->getProperties()->setProperty(mManagementAdapterName + ".ThreadPool.Size", strDefaultSize);
+        }
+    }
+
+    if (mCommunicator->getProperties()->getPropertyAsIntWithDefault(mBackplaneAdapterName + ".ThreadPool.Size", 0) < defaultSize)
+    {
+        if (defaultPoolSize < defaultSize)
+        {
+            lg(Info) << "Configured Internal thread pool size for " << mBackplaneAdapterName + " is too small, defaulting to " << strDefaultSize;
+            mCommunicator->getProperties()->setProperty(mBackplaneAdapterName + ".ThreadPool.Size", strDefaultSize);
+        }
+    }
+
+    int clientSize = mCommunicator->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Client.Size", 0);
+    if (clientSize < defaultSize)
+    {
+        lg(Warning) << "Client thread pool size of " << defaultPoolSize << " is too small! Set to " << strDefaultSize << " or greater.";
+        assert(false);
+    }
+}
+
 void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr& communicator,
     const Ice::StringSeq&)
 {
-    mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
+    mCommunicator = communicator;
+
+    mBackplaneAdapterName = appName + ".BackplaneAdapter";
+    mDiscoveryAdapterName = appName + ".Locator.ServiceAdapter";
+    mManagementAdapterName = appName + ".Management.ServiceAdapter";
+    verifyProperties();
 
-    string backplaneAdapterName = appName + ".BackplaneAdapter";
-    mLocalAdapter = communicator->createObjectAdapterWithEndpoints(backplaneAdapterName,
-        communicator->getProperties()->getPropertyWithDefault(backplaneAdapterName + ".Endpoints", "tcp -p 4410"));
+    mIceStorm = new AsteriskSCF::CollocatedIceStorm::CollocatedIceStorm(appName, communicator->getProperties());
+    
+    mLocalAdapter = communicator->createObjectAdapterWithEndpoints(mBackplaneAdapterName,
+        communicator->getProperties()->getPropertyWithDefault(mBackplaneAdapterName + ".Endpoints", "tcp -p 4410"));
 
     ConfiguredIceLoggerPtr mIceLogger = createIceLogger(mLocalAdapter);
 
@@ -186,11 +368,11 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
 
     /* Talk to the topic manager to either create or get the service discovery topic,
      * configured or default */
-    IceStorm::TopicManagerPrx topicManager = mIceStorm->createTopicManagerProxy(communicator);
+    mTopicManager = mIceStorm->createTopicManagerProxy(communicator);
 
     EventsPrx serviceDiscoveryTopic;
 
-    if (topicManager)
+    if (mTopicManager)
     {
         Ice::PropertiesPtr props = communicator->getProperties();
         string topicName = props->getProperty(appName + ".TopicName");
@@ -203,13 +385,13 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
         IceStorm::TopicPrx topic;
         try
         {
-            topic = topicManager->retrieve(topicName);
+            topic = mTopicManager->retrieve(topicName);
         }
         catch (const IceStorm::NoSuchTopic&)
         {
             try
             {
-                topic = topicManager->create(topicName);
+                topic = mTopicManager->create(topicName);
             }
             catch (const IceStorm::TopicExists&)
             {
@@ -226,22 +408,22 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
         lg(Info) << "IceStorm topic manager proxy not present, events disabled.";
     }
 
-    mReplicaService = new ReplicaImpl(mLocalAdapter);
+    mStandalone = getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false);
+    ReplicaImpl* replicaImpl = new ReplicaImpl(mLocalAdapter, mStandalone);
+    mReplicaService = replicaImpl;
     mLocalAdapter->add(mReplicaService, communicator->stringToIdentity(ReplicaServiceId));
 
     /* Management and discovery use separate adapters to provide a level of security,
      * management may want to be protected so arbitrary people can't inject bad services
      * into the infrastructure while discovery as a read only function may be allowed to all.
      */
+    mManagementAdapter = communicator->createObjectAdapterWithEndpoints(mManagementAdapterName,
+        communicator->getProperties()->getPropertyWithDefault(mManagementAdapterName + ".Endpoints", "tcp -p 4412"));
 
-    string managementAdapterName = appName + ".Management.ServiceAdapter";
-    mManagementAdapter = communicator->createObjectAdapterWithEndpoints(managementAdapterName,
-        communicator->getProperties()->getPropertyWithDefault(managementAdapterName + ".Endpoints", "tcp -p 4412"));
-
-    ServiceLocatorManagementImplPtr locatorServiceManagement =
+    mLocatorServiceManagement =
         new ServiceLocatorManagementImpl(mManagementAdapter, serviceDiscoveryTopic, mReplicaService);
 
-    mManagementAdapter->add(locatorServiceManagement,
+    mManagementAdapter->add(mLocatorServiceManagement,
         communicator->stringToIdentity("LocatorServiceManagement"));
 
     mManagementAdapter->activate();
@@ -249,20 +431,27 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
     try
     {
 	mStateReplicator = ServiceLocatorStateReplicatorPrx::checkedCast(communicator->propertyToProxy(appName + ".StateReplicator.Proxy"));
-        ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(locatorServiceManagement);
-        ServiceLocatorStateReplicatorListenerPrx replicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
+        ServiceLocatorStateReplicatorListenerPtr replicatorListener = new ServiceLocatorStateReplicatorListenerI(mLocatorServiceManagement);
+        mReplicatorListenerProxy = ServiceLocatorStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(replicatorListener));
 
-	locatorServiceManagement->setStateReplicator(mStateReplicator);
+	mLocatorServiceManagement->setStateReplicator(mStateReplicator);
 
-        if (getBooleanPropertyValueWithDefault(communicator->getProperties(), appName + ".Standalone", false))
+        if (mStandalone)
         {
-	    mReplicaService->standby();
-            mStateReplicator->addListener(replicatorListenerProxy);
-            lg(Info) << "Operating as a standby replica." << endl;
+            lg(Info) << "Operating in an active, standalone state and pushing updates." << endl;
         }
-        else
+        else 
         {
-            lg(Info) << "Operating in an active state and pushing updates." << endl;
+            // The default state is standby, in a replica group. 
+            // In this state, the component must be activated via it's Replica interface.
+            if (mStateReplicator == 0)
+            {
+                lg(Error) << "Operating in replica group with no access to state replicator defined!" << endl;
+                lg(Error) << "--- Check config file for <servicename>.StateReplicator.Proxy setting, or run standalone." << endl;
+                assert(false);
+            }
+
+            lg(Info) << "Operating as a standby replica." << endl;
         }
     }
     catch (const std::exception& e)
@@ -275,34 +464,67 @@ void ServiceLocatorApp::start(const string& appName, const Ice::CommunicatorPtr&
 	lg(Warning) << "Operating in an active and standalone state." << endl;
     }
 
-    lg(Info) << "Activated service discovery management.";
+    lg(Info) << "Publishing service discovery management.";
 
-    string locatorAdapterName = appName + ".Locator.ServiceAdapter";
-    mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(locatorAdapterName,
-        communicator->getProperties()->getPropertyWithDefault(locatorAdapterName + ".Endpoints", "tcp -p 4411"));
+    mDiscoveryAdapter = communicator->createObjectAdapterWithEndpoints(mDiscoveryAdapterName,
+        communicator->getProperties()->getPropertyWithDefault(mDiscoveryAdapterName + ".Endpoints", "tcp -p 4411"));
 
-    ServiceLocatorPtr locatorService = new ServiceLocatorImpl(locatorServiceManagement);
+    ServiceLocatorPtr locatorService = new ServiceLocatorImpl(mLocatorServiceManagement);
 
     mDiscoveryAdapter->add(locatorService, communicator->stringToIdentity("LocatorService"));
 
-    mDiscoveryAdapter->addFacet(locatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+    mDiscoveryAdapter->addFacet(mLocatorServiceManagement, communicator->stringToIdentity("LocatorService"), ServiceLocatorManagementFacet);
+
+    lg(Info) << "Publishing service discovery.";
+
+    replicaImpl->setServiceLocatorApp(this);
+
+    mDiscoveryAdapter->activate();
 
     // Make our IceStorm topic manager available to all if we are the active service. This is because by adding it
     // we will replicate it.
     if (mReplicaService->isActive() == true)
     {
-	ServiceManagementPrx icestormManagement = locatorServiceManagement->addService(topicManager, "TopicManager", Ice::Current());
-	ServiceLocatorParamsPtr params = new ServiceLocatorParams;
-	params->category = TopicManagerCategory;
-	params->service = "default";
-	icestormManagement->addLocatorParams(params, "");
+        activated();
+        lg(Info) << "Waiting for requests.";
+        return;
     }
 
-    mDiscoveryAdapter->activate();
+    onStandby();
+    lg(Info) << "In standby mode.";
+}
 
-    lg(Info) << "Activated service discovery.";
+void ServiceLocatorApp::activated()
+{
+    lg(Info) << "App activated callback entered.";
 
-    lg(Info) << "Waiting for requests.";
+    if (!mStandalone)
+    {
+        
+        lg(Info) << "  Removing listener from Replicator";
+        // Stop listening to state replicator.
+        mStateReplicator->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
+    }
+
+    lg(Info) << "  Creating topic manager";
+    ServiceManagementPrx icestormManagement = mLocatorServiceManagement->addService(
+    AsteriskSCF::Operations::createContext(), mTopicManager, "TopicManager", Ice::Current());
+    ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+    params->category = TopicManagerCategory;
+    params->service = "default";
+
+    lg(Info) << "  Adding locator params for topic manager";
+    icestormManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
+    lg(Info) << "Activated callback complete";
+}
+
+void ServiceLocatorApp::onStandby()
+{
+    if (!mStandalone)
+    {
+        // Listen to state replicator.
+        mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
+    }
 }
 
 void ServiceLocatorApp::stop()
@@ -323,10 +545,7 @@ void ServiceLocatorApp::stop()
     mIceStorm->stop();
 }
 
-extern "C"
-{
-ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
+extern "C" ASTSCF_DLL_EXPORT IceBox::Service* create(Ice::CommunicatorPtr)
 {
     return new ServiceLocatorApp;
 }
-}
diff --git a/src/ServiceLocatorManagement.cpp b/src/ServiceLocatorManagement.cpp
index 0a953c8..11e31dd 100644
--- a/src/ServiceLocatorManagement.cpp
+++ b/src/ServiceLocatorManagement.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,29 +14,35 @@
  * at the top of the source tree.
  */
 
-#include <Ice/Ice.h>
-#include <IceUtil/UUID.h>
-
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
 #include <boost/thread.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
+
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Async/ResponseCollector.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "ServiceLocatorStateReplicationIf.h"
 
 #include "ServiceLocatorManagement.h"
 #include "ServiceManagement.h"
 
-using namespace std;
-using namespace AsteriskSCF::System::Discovery;
-using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::ServiceLocator::V1;
 using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::Discovery;
+using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF;
+using namespace std;
 
 //
 // Normally would use an anonymous namespace here, but there is a compiler
@@ -47,6 +53,9 @@ namespace ServiceLocatorManagementImplNS
 
 const Logger& lg = getLoggerFactory().getLogger("AsteriskSCF.System.Discovery");
 
+typedef ContextResultData<ServiceManagementPrx> AddServiceResultData;
+typedef boost::shared_ptr<AddServiceResultData> AddServiceResultDataPtr;
+
 /** Parameter type for Locator, to allow use of ResponseCollector. */
 typedef std::pair<bool, ServiceManagementImplPtr> LocateParam;
 
@@ -317,9 +326,10 @@ public:
     ServiceLocatorManagementImplPriv(const Ice::ObjectAdapterPtr& adapter,
         const EventsPrx& serviceDiscoveryTopic,
 	const AsteriskSCF::System::Component::V1::ReplicaPtr replicaService) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic), mReplicaService(replicaService)
     {
-    };
+    }
 
     /**
      * Shared mutex lock which protects the services and comparators.
@@ -327,6 +337,11 @@ public:
     boost::shared_mutex mLock;
 
     /**
+     * Context cache for retry detection.
+     */
+    OperationContextCachePtr mOperationContextCache;
+
+    /**
      * Object adapter that our service management proxies originate from, it is
      * houses the main management service.
      */
@@ -420,8 +435,16 @@ void ServiceLocatorManagementImpl::locateAll(
  * Implementation of the addService method as defined in service_locator.ice
  */
 ServiceManagementPrx ServiceLocatorManagementImpl::addService(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const Ice::ObjectPrx& service, const string& guid, const Ice::Current&)
 {
+    std::pair<bool, AddServiceResultDataPtr> cacheHit = getContextSync<AddServiceResultDataPtr>(mImpl->mOperationContextCache, context);
+
+    if (cacheHit.first)
+    {
+        return cacheHit.second->getResult();
+    }
+
     lg(Debug) << "addService(" << guid << ')';
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
@@ -434,10 +457,12 @@ ServiceManagementPrx ServiceLocatorManagementImpl::addService(
 
     mImpl->mServices.push_back(new_service);
 
-    return new_service->getServiceManagementPrx();
+    ServiceManagementPrx r = new_service->getServiceManagementPrx();
+    cacheHit.second->setResult(r);
+    return r;
 }
 
-ServiceManagementImplPtr ServiceLocatorManagementImpl::addService(const Ice::ObjectPrx& service, 
+ServiceManagementImplPtr ServiceLocatorManagementImpl::addService(const Ice::ObjectPrx& service,
     const std::string& guid, const Ice::Identity& identity)
 {
     lg(Debug) << "addService(" << guid << ')';
@@ -484,42 +509,87 @@ ServiceInfo ServiceLocatorManagementImpl::getService(const std::string &guid, co
 /**
  * Implementation of the addCompare method as defined in service_locator.ice
  */
-void ServiceLocatorManagementImpl::addCompare(const string& guid,
+void ServiceLocatorManagementImpl::addCompare(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& guid,
     const ServiceLocatorParamsComparePrx& service, const Ice::Current&)
 {
-    lg(Debug) << "addCompare(" << guid << ')';
-    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
-
-    pair<map<string, ServiceLocatorParamsComparePrx>::iterator, bool> insertPair =
-        mImpl->mCompares.insert(make_pair(guid, service));
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    if (insertPair.second == false)
+    if (!data)
     {
-        throw DuplicateCompare();
+        // retry detected
+        return;
     }
 
-    if (mImpl->mLocatorTopic)
+    try
+    {
+        lg(Debug) << "addCompare(" << guid << ')';
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+        pair<map<string, ServiceLocatorParamsComparePrx>::iterator, bool> insertPair =
+            mImpl->mCompares.insert(make_pair(guid, service));
+
+        if (insertPair.second == false)
+        {
+            throw DuplicateCompare();
+        }
+
+        if (mImpl->mLocatorTopic)
+        {
+            mImpl->mLocatorTopic->comparisonRegistered(AsteriskSCF::Operations::createContext(context), guid);
+        }
+        data->setCompleted();
+    }
+    catch(const std::exception& e)
     {
-        mImpl->mLocatorTopic->comparisonRegistered(guid);
+        data->setException(e);
+        throw;
+    }
+    catch(...)
+    {
+        data->setException();
+        throw;
     }
 }
 
 /**
  * Implementation of the removeCompare method as defined in service_locator.ice
  */
-void ServiceLocatorManagementImpl::removeCompare(const string& guid, const Ice::Current&)
+void ServiceLocatorManagementImpl::removeCompare(const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const string& guid, const Ice::Current&)
 {
-    lg(Debug) << "removeCompare(" << guid << ')';
-    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
-    std::map<std::string, ServiceLocatorParamsComparePrx>::size_type erased = mImpl->mCompares.erase(guid);
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    if (!erased)
+    if (!data)
     {
-        throw CompareNotFound();
+        // retry detected
+        return;
     }
-    else if (mImpl->mLocatorTopic)
+
+    try
+    {
+        lg(Debug) << "removeCompare(" << guid << ')';
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+        std::map<std::string, ServiceLocatorParamsComparePrx>::size_type erased = mImpl->mCompares.erase(guid);
+
+        if (!erased)
+        {
+            throw CompareNotFound();
+        }
+        else if (mImpl->mLocatorTopic)
+        {
+            mImpl->mLocatorTopic->comparisonUnregistered(AsteriskSCF::Operations::createContext(context), guid);
+        }
+        data->setCompleted();
+    }
+    catch(const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch(...)
     {
-        mImpl->mLocatorTopic->comparisonUnregistered(guid);
+        data->setException();
+        throw;
     }
 }
 
@@ -612,7 +682,7 @@ void ServiceLocatorManagementImpl::replicateState(const ServiceLocatorStateItemP
     try
     {
         ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
-        oneway->setState(items);
+        oneway->setState(AsteriskSCF::Operations::createContext(), items);
     }
 	catch (const Ice::NoEndpointException&)
     {
@@ -642,7 +712,7 @@ void ServiceLocatorManagementImpl::removeState(const ServiceLocatorStateItemPtr&
     try
     {
         ServiceLocatorStateReplicatorPrx oneway = ServiceLocatorStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
-        oneway->removeState(items);
+        oneway->removeState(AsteriskSCF::Operations::createContext(), items);
     }
 	catch (const Ice::NoEndpointException&)
     {
diff --git a/src/ServiceLocatorManagement.h b/src/ServiceLocatorManagement.h
index 468565f..a5aa052 100644
--- a/src/ServiceLocatorManagement.h
+++ b/src/ServiceLocatorManagement.h
@@ -87,15 +87,16 @@ public:
     // AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagement interface.
     //
     AsteriskSCF::Core::Discovery::V1::ServiceManagementPrx addService(
+        const AsteriskSCF::System::V1::OperationContextPtr&, 
         const Ice::ObjectPrx&, const std::string&, const Ice::Current&);
     AsteriskSCF::Core::Discovery::V1::ServiceInfoSeq getServices(
         const ::Ice::Current&) const;
     AsteriskSCF::Core::Discovery::V1::ServiceInfo getService(
         const std::string &, const ::Ice::Current&) const;
-    void addCompare(const std::string&,
+    void addCompare(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsComparePrx&,
         const Ice::Current&);
-    void removeCompare(const std::string&, const Ice::Current& = Ice::Current());
+    void removeCompare(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&, const Ice::Current& = Ice::Current());
 
     void isSupported(const std::string&,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&,
diff --git a/src/ServiceLocatorStateListener.cpp b/src/ServiceLocatorStateListener.cpp
index a549e3b..498d313 100644
--- a/src/ServiceLocatorStateListener.cpp
+++ b/src/ServiceLocatorStateListener.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,16 +14,17 @@
  * at the top of the source tree.
  */
 
-#include <IceUtil/UUID.h>
-
 #include <boost/thread.hpp>
 #include <boost/shared_ptr.hpp>
 
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
-// #include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <IceUtil/UUID.h>
 
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "ServiceLocatorManagement.h"
 #include "ServiceManagement.h"
@@ -33,24 +34,26 @@
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Replication::ServiceLocator::V1;
 using namespace AsteriskSCF::ServiceDiscovery;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 class ServiceLocatorStateReplicatorItem
 {
 public:
     ServiceLocatorStateReplicatorItem(const ServiceLocatorManagementImplPtr& locatorManagement) :
-	  mLocatorManagement(locatorManagement) 
-    { 
+        mLocatorManagement(locatorManagement)
+    {
     }
 
     ~ServiceLocatorStateReplicatorItem()
     {
 	if (mService)
 	{
-	    mService->unregister();
+	    mService->unregister(AsteriskSCF::Operations::createContext());
 	}
 	if (!mComparator.empty())
 	{
-	    mLocatorManagement->removeCompare(mComparator);
+	    mLocatorManagement->removeCompare(AsteriskSCF::Operations::createContext(), mComparator);
 	}
     }
 
@@ -88,93 +91,130 @@ private:
 struct ServiceLocatorStateReplicatorListenerImpl
 {
 public:
-    ServiceLocatorStateReplicatorListenerImpl(ServiceLocatorManagementImplPtr management)
-        : mId(IceUtil::generateUUID()), mLocatorManagement(management) {}
+    ServiceLocatorStateReplicatorListenerImpl(ServiceLocatorManagementImplPtr management) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+        mId(IceUtil::generateUUID()), mLocatorManagement(management) {}
+
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
     {
+        boost::mutex::scoped_lock lock(mMutex);
         for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
         {
             // Just erasing this from the map will cause the destructor to actually shut things down
             mStateItems.erase((*key));
         }
     }
-    void setStateNoticeImpl(const ServiceLocatorStateItemSeq& items)
+    void setStateNoticeImpl(const OperationContextPtr& context, const ServiceLocatorStateItemSeq& items)
     {
-        for (ServiceLocatorStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+        ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+
+        if (!data)
         {
-	    ServiceLocatorServiceStateItemPtr serviceState;
-	    ServiceLocatorParamsStateItemPtr paramsState;
-	    ServiceLocatorComparatorStateItemPtr comparatorState;
-
-	    if ((serviceState = ServiceLocatorServiceStateItemPtr::dynamicCast((*item))))
-	    {
-		std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
-	        boost::shared_ptr<ServiceLocatorStateReplicatorItem> localitem;
-
-		if ((i == mStateItems.end()))
-		{
-		    boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
-                    localitem = newitem;
-                    mStateItems.insert(std::make_pair((*item)->key, newitem));
-		    ServiceManagementImplPtr service = mLocatorManagement->addService(serviceState->service, serviceState->guid, serviceState->managementIdentity);
-		    newitem->setService(service);
-		}
-		else
-		{
-		    localitem = i->second;
-		}
-
-		// The only thing that can be changed by a subsequent state item is the suspend status
-		if (serviceState->suspended == true)
-		{
-		    localitem->getService()->suspend();
-		}
-		else
-		{
-		    localitem->getService()->unsuspend();
-		}
-	    }
-	    else if ((paramsState = ServiceLocatorParamsStateItemPtr::dynamicCast((*item))))
-	    {
-		// This is special, we have to find the respective service and then add parameters to it
-		std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find(paramsState->serviceKey);
-
-	        if ((i == mStateItems.end()))
-		{
-		    continue;
-		}
-
-		// Parameters are only ever added, they are never modified or removed
-		i->second->getService()->addLocatorParams(paramsState->params, paramsState->compareGuid);
-	    }
-	    else if ((comparatorState = ServiceLocatorComparatorStateItemPtr::dynamicCast((*item))))
-	    {
-		std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
-
-                if ((i != mStateItems.end()))
-		{
-		    // If this happens we essentially got a duplicate state item for something that should never change, so ignore it
-		    continue;
-		}
-
-		try
-		{
-		    Ice::Current current;
-		    mLocatorManagement->addCompare(comparatorState->name, comparatorState->comparator, current);
-		    boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
-		    mStateItems.insert(std::make_pair((*item)->key, newitem));
-		    newitem->setComparator(comparatorState->name);
-		}
-		catch (...)
-		{
-		    // It is possible for this to get reached if a comparator exists locally with the same name as the one we just tried to add
-		}
-	    }
+            // retry detected
+            return;
         }
+
+        boost::mutex::scoped_lock lock(mMutex);
+
+        try
+        {
+            for (ServiceLocatorStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+            {
+                ServiceLocatorServiceStateItemPtr serviceState;
+                ServiceLocatorParamsStateItemPtr paramsState;
+                ServiceLocatorComparatorStateItemPtr comparatorState;
+
+                if ((serviceState = ServiceLocatorServiceStateItemPtr::dynamicCast((*item))))
+                {
+                    std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
+                    boost::shared_ptr<ServiceLocatorStateReplicatorItem> localitem;
+
+                    if ((i == mStateItems.end()))
+                    {
+                        boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
+                        localitem = newitem;
+                        mStateItems.insert(std::make_pair((*item)->key, newitem));
+                        ServiceManagementImplPtr service = mLocatorManagement->addService(
+                            serviceState->service, serviceState->guid, serviceState->managementIdentity);
+                        newitem->setService(service);
+                    }
+                    else
+                    {
+                        localitem = i->second;
+                    }
+
+                    // The only thing that can be changed by a subsequent state item is the suspend status
+                    if (serviceState->suspended == true)
+                    {
+                        localitem->getService()->suspend(AsteriskSCF::Operations::createContext(context));
+                    }
+                    else
+                    {
+                        localitem->getService()->unsuspend(AsteriskSCF::Operations::createContext(context));
+                    }
+                }
+                else if ((paramsState = ServiceLocatorParamsStateItemPtr::dynamicCast((*item))))
+                {
+                    // This is special, we have to find the respective service and then add parameters to it
+                    std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find(paramsState->serviceKey);
+
+                    if ((i == mStateItems.end()))
+                    {
+                        continue;
+                    }
+
+                    // Parameters are only ever added, they are never modified or removed
+                    i->second->getService()->addLocatorParams(AsteriskSCF::Operations::createContext(context),
+                        paramsState->params, paramsState->compareGuid);
+                }
+                else if ((comparatorState = ServiceLocatorComparatorStateItemPtr::dynamicCast((*item))))
+                {
+                    std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> >::iterator i = mStateItems.find((*item)->key);
+
+                    if ((i != mStateItems.end()))
+                    {
+                        // If this happens we essentially got a duplicate state item for something that should never change, so ignore it
+                        continue;
+                    }
+
+                    try
+                    {
+                        Ice::Current current;
+                        mLocatorManagement->addCompare(AsteriskSCF::Operations::createContext(context), comparatorState->name, comparatorState->comparator, current);
+                        boost::shared_ptr<ServiceLocatorStateReplicatorItem> newitem(new ServiceLocatorStateReplicatorItem(mLocatorManagement));
+                        mStateItems.insert(std::make_pair((*item)->key, newitem));
+                        newitem->setComparator(comparatorState->name);
+                    }
+                    catch (...)
+                    {
+                        // It is possible for this to get reached if a comparator exists locally with the same name as the one we just tried to add
+                    }
+                }
+            }
+        }
+        catch (const std::exception& e)
+        {
+            data->setException(e);
+            throw;
+        }
+        catch (...)
+        {
+            assert(false);
+            data->setException();
+            throw;
+        }
+
+        data->setCompleted();
     }
-    std::string mId;
+
+    const std::string& getId() const { return mId; }
+
+private:
+    boost::mutex mMutex;
+    OperationContextCachePtr mOperationContextCache;
+    const std::string mId;
     std::map<std::string, boost::shared_ptr<ServiceLocatorStateReplicatorItem> > mStateItems;
-    ServiceLocatorManagementImplPtr mLocatorManagement;
+    const ServiceLocatorManagementImplPtr mLocatorManagement;
 };
 
 ServiceLocatorStateReplicatorListenerI::ServiceLocatorStateReplicatorListenerI(const ServiceLocatorManagementImplPtr& management)
@@ -182,17 +222,18 @@ ServiceLocatorStateReplicatorListenerI::ServiceLocatorStateReplicatorListenerI(c
 {
 }
 
-void ServiceLocatorStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+void ServiceLocatorStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq& itemKeys, const Ice::Current&)
 {
+    // naturally idempotent
     mImpl->removeStateNoticeImpl(itemKeys);
 }
 
-void ServiceLocatorStateReplicatorListenerI::stateSet(const ServiceLocatorStateItemSeq& items, const Ice::Current&)
+void ServiceLocatorStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const ServiceLocatorStateItemSeq& items, const Ice::Current&)
 {
-    mImpl->setStateNoticeImpl(items);
+    mImpl->setStateNoticeImpl(context, items);
 }
 
 bool ServiceLocatorStateReplicatorListenerI::operator==(const ServiceLocatorStateReplicatorListenerI &rhs)
 {
-    return mImpl->mId == rhs.mImpl->mId;
+    return mImpl->getId() == rhs.mImpl->getId();
 }
diff --git a/src/ServiceLocatorStateReplicator.h b/src/ServiceLocatorStateReplicator.h
index 8b64b67..c24dd2e 100644
--- a/src/ServiceLocatorStateReplicator.h
+++ b/src/ServiceLocatorStateReplicator.h
@@ -41,8 +41,8 @@ class ServiceLocatorStateReplicatorListenerI :
 {
 public:
     ServiceLocatorStateReplicatorListenerI(const AsteriskSCF::ServiceDiscovery::ServiceLocatorManagementImplPtr&);
-    void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
-    void stateSet(const AsteriskSCF::Replication::ServiceLocator::V1::ServiceLocatorStateItemSeq&, const Ice::Current&);
+    void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+    void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::ServiceLocator::V1::ServiceLocatorStateItemSeq&, const Ice::Current&);
     bool operator==(const ServiceLocatorStateReplicatorListenerI& rhs);
 private:
     boost::shared_ptr<ServiceLocatorStateReplicatorListenerImpl> mImpl;
diff --git a/src/ServiceLocatorStateReplicatorApp.cpp b/src/ServiceLocatorStateReplicatorApp.cpp
index ecc505f..3035efc 100644
--- a/src/ServiceLocatorStateReplicatorApp.cpp
+++ b/src/ServiceLocatorStateReplicatorApp.cpp
@@ -14,6 +14,11 @@
  * at the top of the source tree.
  */
 
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
 #include <IceStorm/IceStorm.h>
@@ -65,17 +70,17 @@ public:
     ComponentServiceImpl(ServiceLocatorStateReplicatorService &service) : mService(service) {}
 
 public: // Overrides of the ComponentService interface.
-    virtual void suspend(const ::Ice::Current& = ::Ice::Current())
+    virtual void suspend(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
     {
         // TBD
     }
 
-    virtual void resume(const ::Ice::Current& = ::Ice::Current())
+    virtual void resume(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
     {
         // TBD
     }
 
-    virtual void shutdown(const ::Ice::Current& = ::Ice::Current())
+    virtual void shutdown(const AsteriskSCF::System::V1::OperationContextPtr&, const ::Ice::Current& = ::Ice::Current())
     {
         // TBD
     }
diff --git a/src/ServiceManagement.cpp b/src/ServiceManagement.cpp
index fac2e48..19bc1b7 100644
--- a/src/ServiceManagement.cpp
+++ b/src/ServiceManagement.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,29 +14,38 @@
  * at the top of the source tree.
  */
 
+//
+// These are moved up in include order because boost seems to have some kind of name collision on Windows.
+//
+#include <boost/thread.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
 
-#include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 #include <boost/shared_ptr.hpp>
 
-#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Async/ResponseCollector.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Async/ResponseCollector.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "ServiceLocatorStateReplicationIf.h"
 
 #include "ServiceLocatorManagement.h"
 #include "ServiceManagement.h"
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::ServiceLocator::V1;
 using namespace AsteriskSCF::ServiceDiscovery;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF;
+using namespace std;
 
 namespace
 {
@@ -101,6 +110,7 @@ public:
     ServiceManagementImplPriv(ServiceManagementImpl* impl, ServiceLocatorManagementImplPtr management,
         const Ice::ObjectPrx& service, const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::System::Discovery::EventsPrx& serviceDiscoveryTopic,
         const string& guid, const Ice::Identity& identity) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mStateItem(new ServiceLocatorServiceStateItem()),
         mManagement(management), mAdapter(adapter), mLocatorTopic(serviceDiscoveryTopic)
     {
@@ -112,7 +122,7 @@ public:
         mStateItem->managementIdentity = identity;
         if (mLocatorTopic)
         {
-            mLocatorTopic->serviceRegistered(guid);
+            mLocatorTopic->serviceRegistered(AsteriskSCF::Operations::createContext(), guid);
         }
         mManagement->replicateState(mStateItem);
     }
@@ -122,6 +132,8 @@ public:
      */
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Service state replication item.
      */
@@ -275,8 +287,8 @@ void ServiceManagementImpl::isSupported(const ServiceLocatorParamsPtr& params, c
     {
         if ((*spec)->isSupported(params, myCallback))
         {
-            // If we get here, a match was found without needing to call an external comparator. 
-            // We are done. 
+            // If we get here, a match was found without needing to call an external comparator.
+            // We are done.
             break;
         }
     }
@@ -295,13 +307,13 @@ const std::string& ServiceManagementImpl::getGuid() const
  *               that is trying to be found.
  * @param callback Callback to asynchronously rx the results.
  *
- * @return True only if we determined a match without any need for calling a custom comparator. 
- *         Otherwise, the callback may be accumulating the results, or it's not a match. 
+ * @return True only if we determined a match without any need for calling a custom comparator.
+ *         Otherwise, the callback may be accumulating the results, or it's not a match.
  */
 bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params, const IsSupportedCallbackPtr& callback)
 {
-    // Does the component doing the locate 
-    // want to filter based on category. 
+    // Does the component doing the locate
+    // want to filter based on category.
     if (!params->category.empty())
     {
         // Is this the wrong category?
@@ -314,18 +326,18 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
         }
     }
 
-    // Does the component doing the locate 
-    // want all services in the category? 
+    // Does the component doing the locate
+    // want all services in the category?
     if (params->service.empty())
     {
-        // If a comparator was provided then yield to it. 
+        // If a comparator was provided then yield to it.
         if (!mStateItem->compareGuid.empty())
         {
             mManagement->isSupported(mStateItem->compareGuid, params, callback);
             return false;
         }
 
-        // Ignore the id and treat this as a wildcard search. 
+        // Ignore the id and treat this as a wildcard search.
 
         lg(Trace) << "  ...isSupported" << debugPrintParams(params) + " = true. Category match explicit, wildcard match service.";
         callback->result(true);
@@ -352,15 +364,15 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
         }
     }
 
-    // If a comparator was provided then yield to it. 
+    // If a comparator was provided then yield to it.
     if (!mStateItem->compareGuid.empty())
     {
         mManagement->isSupported(mStateItem->compareGuid, params, callback);
         return false;
     }
 
-    // If we get here we have a match on service and id. 
-    // (and category, if one was passed in.) 
+    // If we get here we have a match on service and id.
+    // (and category, if one was passed in.)
     lg(Trace) << "  ...isSupported" << debugPrintParams(params) + " = true";
     callback->result(true);
     return true;
@@ -369,8 +381,16 @@ bool ServiceLocatorParamsSpec::isSupported(const ServiceLocatorParamsPtr& params
 /**
  * Implementation of the addLocatorParams method as defined in service_locator.ice
  */
-void ServiceManagementImpl::addLocatorParams(const ServiceLocatorParamsPtr& params, const std::string& compareGuid, const Ice::Current&)
+void ServiceManagementImpl::addLocatorParams(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const ServiceLocatorParamsPtr& params, const std::string& compareGuid, const Ice::Current&)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
     boost::shared_ptr<ServiceLocatorParamsSpec> spec(new ServiceLocatorParamsSpec(params, compareGuid, mImpl->mManagement, mImpl->mStateItem));
@@ -381,41 +401,87 @@ void ServiceManagementImpl::addLocatorParams(const ServiceLocatorParamsPtr& para
 /**
  * Implementation of the suspend method as defined in service_locator.ice
  */
-void ServiceManagementImpl::suspend(const Ice::Current&)
+void ServiceManagementImpl::suspend(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
 {
-    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    if (!mImpl->mStateItem->suspended)
+    if (!data)
     {
-        lg(Info) << "Suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
-        mImpl->mStateItem->suspended = true;
-        mImpl->mManagement->replicateState(mImpl->mStateItem);
+        // retry detected
+        return;
     }
 
-    if (mImpl->mLocatorTopic)
+    try
     {
-        mImpl->mLocatorTopic->serviceSuspended(mImpl->mStateItem->guid);
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+        if (!mImpl->mStateItem->suspended)
+        {
+            lg(Info) << "Suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
+            mImpl->mStateItem->suspended = true;
+            mImpl->mManagement->replicateState(mImpl->mStateItem);
+        }
+
+        if (mImpl->mLocatorTopic)
+        {
+            mImpl->mLocatorTopic->serviceSuspended(AsteriskSCF::Operations::createContext(context), mImpl->mStateItem->guid);
+        }
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        assert(false);
+        data->setException();
+        throw;
     }
 }
 
 /**
  * Implementation of the unsuspend method as defined in service_locator.ice
  */
-void ServiceManagementImpl::unsuspend(const Ice::Current&)
+void ServiceManagementImpl::unsuspend(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
 {
-    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    if (mImpl->mStateItem->suspended)
+    if (!data)
     {
-        lg(Info) << "Un-suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
-        mImpl->mStateItem->suspended = false;
-        mImpl->mManagement->replicateState(mImpl->mStateItem);
+        // retry detected
+        return;
     }
 
-    if (mImpl->mLocatorTopic)
+    try
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+        if (mImpl->mStateItem->suspended)
+        {
+            lg(Info) << "Un-suspending " << mImpl->mStateItem->guid << " " << mImpl->mStateItem->service->ice_toString();
+            mImpl->mStateItem->suspended = false;
+            mImpl->mManagement->replicateState(mImpl->mStateItem);
+        }
+
+        if (mImpl->mLocatorTopic)
+        {
+            mImpl->mLocatorTopic->serviceUnsuspended(context, mImpl->mStateItem->guid);
+        }
+    }
+    catch (const std::exception& e)
     {
-        mImpl->mLocatorTopic->serviceUnsuspended(mImpl->mStateItem->guid);
+        data->setException(e);
+        throw;
     }
+    catch (...)
+    {
+        assert(false);
+        data->setException();
+        throw;
+    }
+    data->setCompleted();
 }
 
 ServiceStatus ServiceManagementImpl::getStatus() const
@@ -433,7 +499,6 @@ ServiceStatus ServiceManagementImpl::getStatus() const
 
 ServiceLocatorParamsSeq ServiceManagementImpl::getLocatorParams(const Ice::Current&)
 {
-    
     boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
     ServiceLocatorParamsSeq result;
     for (std::vector<boost::shared_ptr<ServiceLocatorParamsSpec> >::const_iterator iter = mImpl->mSupportedLocatorParams.begin();
@@ -447,8 +512,16 @@ ServiceLocatorParamsSeq ServiceManagementImpl::getLocatorParams(const Ice::Curre
 /**
  * Implementation of the unregister method as defined in service_locator.ice
  */
-void ServiceManagementImpl::unregister(const Ice::Current&)
+void ServiceManagementImpl::unregister(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current&)
 {
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
+
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
     /* You'll notice no lock here. That's because we aren't actually modifying any internal state that should
      * be protected, and if we did lock here there is a chance for a deadlock which is super sad.
      */
@@ -464,19 +537,21 @@ void ServiceManagementImpl::unregister(const Ice::Current&)
 
         if (mImpl->mLocatorTopic)
         {
-            mImpl->mLocatorTopic->serviceUnregistered(mImpl->mStateItem->guid);
+            mImpl->mLocatorTopic->serviceUnregistered(AsteriskSCF::Operations::createContext(context), mImpl->mStateItem->guid);
         }
+
+        data->setCompleted();
     }
     catch(const std::exception& e)
     {
         lg(Error) << BOOST_CURRENT_FUNCTION << " : " << e.what();
+        data->setException(e);
 	throw;
     }
     catch(...)
     {
         lg(Error) << BOOST_CURRENT_FUNCTION << " : " << "Unknown exception.";
+        data->setException();
 	throw;
     }
 }
-
-
diff --git a/src/ServiceManagement.h b/src/ServiceManagement.h
index a1966d7..e8f0907 100644
--- a/src/ServiceManagement.h
+++ b/src/ServiceManagement.h
@@ -39,10 +39,11 @@ public:
     //
     // AsteriskSCF::Core::Discovery::V1::ServiceManagement interface.
     //
-    void addLocatorParams(const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&, const std::string&, const Ice::Current& = Ice::Current());
-    void suspend(const Ice::Current& = Ice::Current());
-    void unsuspend(const Ice::Current& = Ice::Current());
-    void unregister(const Ice::Current& = Ice::Current());
+    void addLocatorParams(const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsPtr&, const std::string&, const Ice::Current& = Ice::Current());
+    void suspend(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
+    void unsuspend(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
+    void unregister(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::Current& = Ice::Current());
     AsteriskSCF::Core::Discovery::V1::ServiceStatus getStatus(const Ice::Current&) const { return getStatus(); }
 
     AsteriskSCF::Core::Discovery::V1::ServiceLocatorParamsSeq getLocatorParams(const Ice::Current&);
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index bbfa333..1b08f67 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,3 +1,4 @@
+include_directories(${astscf-ice-util-cpp_dir}/include)
 astscf_component_init(service_locator_test)
 astscf_component_add_files(service_locator_test TestServiceLocator.cpp)
 astscf_component_add_files(service_locator_test TestComparatorBlocking.cpp)
@@ -6,5 +7,6 @@ astscf_component_add_boost_libraries(service_locator_test unit_test_framework th
 astscf_component_add_slice_collection_libraries(service_locator_test ASTSCF)
 astscf_component_build_icebox(service_locator_test)
 astscf_test_icebox(service_locator_test config/test_component.conf)
+target_link_libraries(service_locator_test LoggingClient ASTSCFIceUtilCpp)
 
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/AsteriskSCFIceStorm)
diff --git a/test/TestComparatorBlocking.cpp b/test/TestComparatorBlocking.cpp
index 7272313..234180b 100644
--- a/test/TestComparatorBlocking.cpp
+++ b/test/TestComparatorBlocking.cpp
@@ -34,6 +34,7 @@
 #include <Ice/Ice.h>
 
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
 using namespace AsteriskSCF::Core::Discovery::V1;
 
@@ -149,7 +150,7 @@ public:
         discovery(ServiceLocatorPrx::checkedCast(
                 discoveryCommunicator->stringToProxy(
                     "LocatorService:tcp -h 127.0.0.1 -p 4411"))),
-        proxy(management->addService(management, "self")),
+        proxy(management->addService(AsteriskSCF::Operations::createContext(), management, "self")),
         params(new ServiceLocatorParams())
     {
         BOOST_REQUIRE(blockerProxy != 0);
@@ -157,17 +158,17 @@ public:
         BOOST_REQUIRE(discovery != 0);
 
         blockerAdapter->activate();
-        management->addCompare("blocker", blockerProxy);
+        management->addCompare(AsteriskSCF::Operations::createContext(), "blocker", blockerProxy);
 
         params->category = "self";
-        proxy->addLocatorParams(params, "blocker");
+        proxy->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "blocker");
     }
 
     ~Fixture()
     {
         blocker->unblock();
-        proxy->unregister();
-        management->removeCompare("blocker");
+        proxy->unregister(AsteriskSCF::Operations::createContext());
+        management->removeCompare(AsteriskSCF::Operations::createContext(), "blocker");
         blockerCommunicator->shutdown();
         blockerCommunicator->waitForShutdown();
 
diff --git a/test/TestServiceLocator.cpp b/test/TestServiceLocator.cpp
index 6c698d0..aebc687 100644
--- a/test/TestServiceLocator.cpp
+++ b/test/TestServiceLocator.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -26,6 +26,8 @@
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorEventsIf.h>
 
+#include <AsteriskSCF/Operations/OperationContext.h>
+
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 
@@ -102,7 +104,7 @@ public:
             params->category = category;
             params->service = service;
 
-            // Actually do the locate request 
+            // Actually do the locate request
             foundCompare = ServiceLocatorParamsComparePrx::uncheckedCast(discovery->locate(params));
 
             /* If we get here we didn't get an exception back telling us no service found... which is wrong! */
@@ -111,7 +113,7 @@ public:
         {
             found = false;
         }
-        catch (const Ice::Exception &e)
+        catch (const Ice::Exception& e)
         {
             BOOST_TEST_MESSAGE(e.ice_name());
             BOOST_TEST_MESSAGE(e.what());
@@ -198,7 +200,7 @@ struct GlobalIceFixture
         {
             throw "Invalid service discovery proxy";
         }
-        
+
     } // end Fixture() constructor
 
     ~GlobalIceFixture()
@@ -290,7 +292,8 @@ BOOST_AUTO_TEST_CASE(AddService)
      */
     try
     {
-        testbed.compareManagement = testbed.management->addService(testbed.compare, "testcompare");
+        testbed.compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+            testbed.compare, "testcompare");
         added = true;
     }
     catch (const Ice::Exception &e)
@@ -306,6 +309,19 @@ BOOST_AUTO_TEST_CASE(AddService)
 }
 
 /**
+ * Test retry logic for adding a service
+ */
+BOOST_AUTO_TEST_CASE(AddServiceRetry)
+{
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+    ServiceManagementPrx expected = testbed.management->addService(context, testbed.compare, "testcompare");
+    ServiceManagementPrx actual = testbed.management->addService(context, testbed.compare, "testcompare");
+
+    BOOST_CHECK_EQUAL(expected, actual);
+}
+
+/**
  * Confirm that we can't find a service after we have added one but before we add parameters.
  */
 BOOST_AUTO_TEST_CASE(ServiceNotFoundAfterAdd)
@@ -338,7 +354,7 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithoutCompareService)
         params->category = "test";
         params->service = "default";
 
-        testbed.compareManagement->addLocatorParams(params, "");
+        testbed.compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
 
         added = true;
     }
@@ -354,6 +370,26 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithoutCompareService)
     BOOST_CHECK(added);
 }
 
+BOOST_AUTO_TEST_CASE(AddLocatorParamsRetry)
+{
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+    ServiceLocatorParamsPtr params = new ServiceLocatorParams;
+    params->category = "retry-test";
+    params->service = "default";
+
+    size_t originalLocatorCount = testbed.compareManagement->getLocatorParams().size();
+
+    testbed.compareManagement->addLocatorParams(context, params, "");
+    testbed.compareManagement->addLocatorParams(context, params, "");
+
+    size_t expected = originalLocatorCount + 1;
+    size_t actual = testbed.compareManagement->getLocatorParams().size();
+
+    BOOST_CHECK_EQUAL(expected, actual);
+}
+
+
 /**
  * Confirm that we can find a service after we add discovery params that do not use a compare service.
  */
@@ -412,7 +448,7 @@ BOOST_AUTO_TEST_CASE(AddCompare)
 
     try
     {
-        testbed.management->addCompare("testcompare", testbed.compare);
+        testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompare", testbed.compare);
 
         added = true;
     }
@@ -440,12 +476,13 @@ BOOST_AUTO_TEST_CASE(AddDuplicateCompare)
 
     try
     {
-        testbed.management->addCompare("testcompare", testbed.compare);
+        testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompare", testbed.compare);
 
         added = true;
     }
     catch (const DuplicateCompare&)
     {
+        // expected
     }
     catch (const Ice::Exception &e)
     {
@@ -460,6 +497,46 @@ BOOST_AUTO_TEST_CASE(AddDuplicateCompare)
 }
 
 /**
+ * Tests retry logic in addCompare.
+ */
+BOOST_AUTO_TEST_CASE(AddCompareRetry)
+{
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+    testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+    BOOST_CHECK_NO_THROW(testbed.management->addCompare(context, "testcompareretry", testbed.compare));
+}
+
+/**
+ * Tests retry logic in addCompare.
+ */
+BOOST_AUTO_TEST_CASE(AddCompareDuplicateRetry)
+{
+    testbed.management->addCompare(AsteriskSCF::Operations::createContext(), "testcompareduplicateretry", testbed.compare);
+
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+    try
+    {
+        testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+        BOOST_FAIL("Expected exception");
+    }
+    catch (const DuplicateCompare&)
+    {
+        // expected
+    }
+
+    try
+    {
+        testbed.management->addCompare(context, "testcompareretry", testbed.compare);
+        BOOST_FAIL("Expected exception");
+    }
+    catch (const DuplicateCompare&)
+    {
+        // expected
+    }
+}
+
+/**
  * Confirm that we can add discovery parameters with a compare service.
  */
 BOOST_AUTO_TEST_CASE(AddLocatorParamsWithCompareService)
@@ -472,7 +549,7 @@ BOOST_AUTO_TEST_CASE(AddLocatorParamsWithCompareService)
         params->category = "test2";
         params->service = "default";
 
-        testbed.compareManagement->addLocatorParams(params, "testcompare");
+        testbed.compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "testcompare");
 
         added = true;
     }
@@ -533,16 +610,17 @@ BOOST_AUTO_TEST_CASE(UseServiceFoundWithCompareService)
 BOOST_AUTO_TEST_CASE(FindMultipleServices)
 {
     ServiceLocatorParamsPtr params = new ServiceLocatorParams;
-    ServiceManagementPrx compareManagement = testbed.management->addService(testbed.compare, "testcompare2");
+    ServiceManagementPrx compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+        testbed.compare, "testcompare2");
 
     params->category = "test";
     params->service = "default";
 
-    compareManagement->addLocatorParams(params, "");
+    compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
 
     bool found = testbed.findServices("test", "", 2);
 
-    compareManagement->unregister();
+    compareManagement->unregister(AsteriskSCF::Operations::createContext());
 
     BOOST_CHECK(found);
 }
@@ -553,16 +631,17 @@ BOOST_AUTO_TEST_CASE(FindMultipleServices)
 BOOST_AUTO_TEST_CASE(FindMultipleServicesUsingEmptyCategory)
 {
     ServiceLocatorParamsPtr params = new ServiceLocatorParams;
-    ServiceManagementPrx compareManagement = testbed.management->addService(testbed.compare, "testcompare2");
+    ServiceManagementPrx compareManagement = testbed.management->addService(AsteriskSCF::Operations::createContext(),
+        testbed.compare, "testcompare2");
 
     params->category = "test";
 
-    compareManagement->addLocatorParams(params, "");
+    compareManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), params, "");
 
     // This takes into account that the service locator internally publishes an IceStorm topic manager service
     bool found = testbed.findServices("", "", 3);
 
-    compareManagement->unregister();
+    compareManagement->unregister(AsteriskSCF::Operations::createContext());
 
     BOOST_CHECK(found);
 }
@@ -576,7 +655,7 @@ BOOST_AUTO_TEST_CASE(ServiceSuspend)
 
     try
     {
-        testbed.compareManagement->suspend();
+        testbed.compareManagement->suspend(AsteriskSCF::Operations::createContext());
         suspended = true;
     }
     catch (const Ice::Exception &e)
@@ -610,7 +689,7 @@ BOOST_AUTO_TEST_CASE(ServiceUnsuspend)
 
     try
     {
-        testbed.compareManagement->unsuspend();
+        testbed.compareManagement->unsuspend(AsteriskSCF::Operations::createContext());
         unsuspended = true;
     }
     catch (const Ice::Exception &e)
@@ -644,7 +723,7 @@ BOOST_AUTO_TEST_CASE(RemoveNonexistentCompareService)
 
     try
     {
-        testbed.management->removeCompare("testcompare2");
+        testbed.management->removeCompare(AsteriskSCF::Operations::createContext(), "testcompare2");
         removed = true;
... 97 lines suppressed ...


-- 
asterisk-scf/release/servicediscovery.git



More information about the asterisk-scf-commits mailing list