[asterisk-scf-commits] asterisk-scf/release/media_transport_udptl.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 8 17:28:14 CDT 2012


branch "master" has been updated
       via  9e7a46fbcac528046354a88afceecf9b57c97284 (commit)
      from  d3eb271c5ee3d83f3e334dd728ff111625c85b5b (commit)

Summary of changes:
 config/test_component.config                       |    8 +-
 config/test_component_v6.config                    |   10 +-
 config/test_udptl_ice.conf                         |   14 +-
 .../Replication/UDPTL/UdptlStateReplicationIf.ice  |   12 +-
 src/Component.cpp                                  |   86 ++++++++---
 src/ICETransport.cpp                               |   68 ++++++---
 src/UDPTLConfiguration.cpp                         |   27 ++-
 src/UDPTLSession.cpp                               |   46 ++++--
 src/UDPTLSink.cpp                                  |   70 +++++++--
 src/UDPTLSink.h                                    |    8 +-
 src/UDPTLSource.cpp                                |    9 +-
 src/UDPTLSource.h                                  |    6 +-
 src/UdptlStateReplicator.h                         |    4 +-
 src/UdptlStateReplicatorListener.cpp               |  165 ++++++++++++--------
 test/CMakeLists.txt                                |    4 +
 test/TestMediaTransportUDPTL.cpp                   |  139 ++++++++++++++---
 test/TestUDPTLICE.cpp                              |   59 ++++++-
 17 files changed, 531 insertions(+), 204 deletions(-)


- Log -----------------------------------------------------------------
commit 9e7a46fbcac528046354a88afceecf9b57c97284
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue May 8 12:03:39 2012 -0500

    Changes for new retry logic.

diff --git a/config/test_component.config b/config/test_component.config
index f7fc24e..8bbc9bd 100644
--- a/config/test_component.config
+++ b/config/test_component.config
@@ -3,6 +3,7 @@
 #
 # Icebox Configuration
 #
+Ice.ThreadPool.Client.Size=4
 
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
@@ -28,11 +29,8 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
 IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
 
 # Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
 MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1 
+MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
 
 #
 # TestMediaTransportUDPTL Configuration
@@ -46,6 +44,8 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLTest:create
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
 ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/config/test_component_v6.config b/config/test_component_v6.config
index 2a04fed..ef809dd 100644
--- a/config/test_component_v6.config
+++ b/config/test_component_v6.config
@@ -3,7 +3,7 @@
 #
 # Icebox Configuration
 #
-
+Ice.ThreadPool.Client.Size=4
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
 
@@ -28,11 +28,8 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
 IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
 
 # Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
 MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1 
+MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1
 
 #
 # TestMediaTransportUDPTL Configuration
@@ -46,6 +43,9 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLTestV6:create
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
 ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/config/test_udptl_ice.conf b/config/test_udptl_ice.conf
index 9e20ebd..f54c2ef 100644
--- a/config/test_udptl_ice.conf
+++ b/config/test_udptl_ice.conf
@@ -6,14 +6,18 @@
 
 UDPTLConfiguration.Name=UDPTLStateReplicator
 
+Ice.Override.Timeout=5000
+Ice.ThreadPool.Client.Size=4
+
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,UDPTLStateReplicator,MediaTransportUDPTL,TestMediaTransportUDPTL
 
-Ice.Override.Timeout=5000
-
 # RtpStateReplicator Configuration
 IceBox.Service.UDPTLStateReplicator=UDPTLStateReplicator:create
 
+# For unit testing, we run without state replicator
+UDPTLStateReplicator.Standalone = true
+
 # Adapter parameters for this component
 UDPTLStateReplicator.Adapter.Endpoints=tcp -h 127.0.0.1:udp -h 127.0.0.1
 
@@ -30,9 +34,6 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
 IceBox.Service.MediaTransportUDPTL=MediaTransportUDPTL:create
 
 # Adapter parameters for this component
-MediaTransportUDPTLAdapter.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaTransportUDPTLAdapterLogger.Endpoints=default -h 127.0.0.1
 MediaTransportUDPTL.ServiceAdapter.Endpoints=default -h 127.0.0.1
 MediaTransportUDPTL.BackplaneAdapter.Endpoints=default -h 127.0.0.1 
 
@@ -48,6 +49,9 @@ IceBox.Service.TestMediaTransportUDPTL=MediaTransportUDPTLIceTest:create
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit testing we run without state replicator.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -h 127.0.0.1 -p 10000
 ServiceDiscovery.IceStorm.Publish.Endpoints=tcp -h 127.0.0.1 -p 10001:udp -h 127.0.0.1 -p 10001
diff --git a/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice b/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
index d7e65f4..8fad508 100644
--- a/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/UDPTL/UdptlStateReplicationIf.ice
@@ -54,16 +54,16 @@ module V1
 
     interface UdptlStateReplicatorListener
     {
-	void stateRemoved(Ice::StringSeq itemKeys);
-	void stateSet(UdptlStateItemSeq items);
+	idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+	idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateItemSeq items);
     };
 
     interface UdptlStateReplicator
     {
-	void addListener(UdptlStateReplicatorListener *listener);
-	void removeListener(UdptlStateReplicatorListener *listener);
-	void setState (UdptlStateItemSeq items);
-	void removeState(Ice::StringSeq items);
+	idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateReplicatorListener *listener);
+	idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateReplicatorListener *listener);
+	idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, UdptlStateItemSeq items);
+	idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
 	idempotent UdptlStateItemSeq getState(Ice::StringSeq itemKeys);
 	idempotent UdptlStateItemSeq getAllState();
     };
diff --git a/src/Component.cpp b/src/Component.cpp
index 60dadb9..260e230 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -31,6 +31,9 @@
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Discovery/SmartProxy.h>
 #include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "UdptlReplicationContext.h"
 #include "UDPTLSession.h"
@@ -40,25 +43,30 @@
 #include "PJMediaEnvironment.h"
 
 using namespace std;
+using namespace AsteriskSCF::Configuration::UDPTL::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
-using namespace AsteriskSCF::Configuration::UDPTL::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
 using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
 using namespace AsteriskSCF::UDPTL;
 
 namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaUDPTL");
-}
 
-static const string ReplicaServiceId("MediaUdptlReplica");
-static const string MediaServiceId("UDPTLMediaService");
+const string ReplicaServiceId("MediaUdptlReplica");
+const string MediaServiceId("UDPTLMediaService");
+
+typedef ContextResultData<UDPTLSessionPrx> UDPTLSessionPrxCookie;
+
+typedef boost::shared_ptr<UDPTLSessionPrxCookie> UDPTLSessionPrxCookiePtr;
+}
 
 /**
  * Implementation of the UDPTLMediaService interface as defined in MediaUDPTLIf.ice
@@ -69,6 +77,7 @@ public:
     UDPTLMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
                           const UdptlReplicationContextPtr& replicationContext,
                           const ConfigurationServiceImplPtr& configurationService) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
         mEnvironment(PJMediaEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
         mReplicationContext(replicationContext),
@@ -76,10 +85,34 @@ public:
     {
     };
 
-    UDPTLSessionPrx allocate(const UDPTLServiceLocatorParamsPtr& params, const Ice::Current&)
+    UDPTLSessionPrx allocate(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const UDPTLServiceLocatorParamsPtr& params, const Ice::Current&)
     {
-        return AsteriskSCF::UDPTL::UDPTLSession::create(mAdapter, IceUtil::generateUUID(), mEnvironment, params,
-                                                        mReplicationContext, mConfigurationService);
+        std::pair<bool, UDPTLSessionPrxCookiePtr> cacheHit =
+            getContextSync<UDPTLSessionPrxCookiePtr>(mOperationContextCache, operationContext);
+
+        if (cacheHit.first)
+        {
+            return cacheHit.second->getResult();
+        }
+
+        try
+        {
+            UDPTLSessionPrx r = AsteriskSCF::UDPTL::UDPTLSession::create(mAdapter, IceUtil::generateUUID(),
+                mEnvironment, params, mReplicationContext, mConfigurationService);
+            cacheHit.second->setResult(r);
+            return r;
+        }
+        catch (const std::exception& e)
+        {
+            cacheHit.second->setException(e);
+            throw;
+        }
+        catch (...)
+        {
+            cacheHit.second->setException();
+            throw;
+        }
     };
 
     pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
@@ -90,6 +123,7 @@ public:
     }
 
 private:
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     PJMediaEnvironmentPtr mEnvironment;
     UdptlReplicationContextPtr mReplicationContext;
@@ -127,6 +161,7 @@ private:
     virtual void onPreInitialize();
     virtual void onStop();    
     virtual void onStart();
+    virtual void onActivated();
 
     // Other base Component overrides
     virtual void prepareBackplaneServicesForDiscovery();
@@ -159,12 +194,12 @@ private:
 
 void Component::onSuspend() 
 {
-    mGeneralState->serviceManagement->suspend();
+    mGeneralState->serviceManagement->suspend(AsteriskSCF::Operations::createContext());
 }
 
 void Component::onResume() 
 {
-    mGeneralState->serviceManagement->unsuspend();
+    mGeneralState->serviceManagement->unsuspend(AsteriskSCF::Operations::createContext());
 }
 
 /**
@@ -336,7 +371,7 @@ void Component::findRemoteServices()
         // replicator service. 
         ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
             udptlReplicationContext->getReplicator().initialize(), ReplicatorFacet);
-        configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
+        configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServicePrx);
 
     }
     catch (const std::exception& e)
@@ -389,7 +424,7 @@ void Component::listenToStateReplicators()
         // Are we in standby mode?
         if (udptlReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
         {
-            udptlReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+            udptlReplicationContext->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
             mListeningToReplicator = true;
         }
     }
@@ -417,7 +452,7 @@ void Component::stopListeningToStateReplicators()
 
     try
     {
-        udptlReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+        udptlReplicationContext->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
         mListeningToReplicator = false;
     }
     catch (const Ice::Exception& e)
@@ -455,7 +490,18 @@ void Component::onRegisterPrimaryServices()
     }
 
     mGeneralState->serviceManagement = mUdptlMediaServiceRegistration->getServiceManagement();
-    mGeneralState->serviceManagement->addLocatorParams(mUdptlLocatorParams, "");
+    mGeneralState->serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), mUdptlLocatorParams, "");
+}
+
+void Component::onActivated()
+{
+    UdptlReplicationContextPtr udptlReplicationContext = 
+        boost::static_pointer_cast<UdptlReplicationContext>(getReplicationContext());
+
+    UdptlStateItemSeq items;
+    items.push_back(mGeneralState);
+    UdptlStateReplicatorPrx oneway = UdptlStateReplicatorPrx::uncheckedCast(udptlReplicationContext->getReplicator()->ice_oneway());
+    oneway->setState(AsteriskSCF::Operations::createContext(), items);
 }
 
 void Component::onStart() 
@@ -468,7 +514,7 @@ void Component::onStart()
         UdptlStateItemSeq items;
         items.push_back(mGeneralState);
         UdptlStateReplicatorPrx oneway = UdptlStateReplicatorPrx::uncheckedCast(udptlReplicationContext->getReplicator()->ice_oneway());
-        oneway->setState(items);
+        oneway->setState(AsteriskSCF::Operations::createContext(), items);
     }
 }
 
@@ -476,7 +522,7 @@ void Component::onStop()
 {
     if (getReplicationContext()->isActive() == true)
     {
-       mGeneralState->serviceManagement->unregister();
+       mGeneralState->serviceManagement->unregister(AsteriskSCF::Operations::createContext());
     }
 }
 
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index db2337d..99d15f6 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010,2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,39 +14,39 @@
  * at the top of the source tree.
  */
 
-#include "ICETransport.h"
-#include "PJUtil.h"
+#include <map>
+#include <sstream>
 
-#include <pjmedia.h>
-#include <pjlib.h>
-#include <pjnath.h>
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
 
-#include <AsteriskSCF/System/ExceptionsIf.h>
-#include <map>
 #include <boost/thread.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
+#include <pjlib.h>
+#include <pjmedia.h>
+#include <pjnath.h>
+
+#include <AsteriskSCF/System/ExceptionsIf.h>
 #include <AsteriskSCF/System/NAT/NATTraversalIf.h>
-#include <Ice/Ice.h>
-#include <sstream>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 #include <AsteriskSCF/Logger.h>
-#include <IceUtil/UUID.h>
 
-using namespace AsteriskSCF::UDPTL;
-using namespace AsteriskSCF::System::V1;
-using namespace AsteriskSCF::PJUtil;
-using namespace std;
+#include "ICETransport.h"
+#include "PJUtil.h"
+
 using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJUtil;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
 
 namespace
 {
 Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaUDPTL");
-}
-
-namespace 
-{
 
 class ICEAgentImpl : public InteractiveConnectionAgent
 {
@@ -54,6 +54,7 @@ public:
 
     ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMediaEnvironmentPtr& env,
         const PJMediaEndpointPtr& ep) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
         mId(id),
         mShuttingDown(false),
@@ -85,10 +86,15 @@ public:
     }
 
     void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
         const string& hostname, Ice::Int port, const CandidateSeq& candidates,
         const Ice::Current&)
     {
-
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected; ignore
+            return;
+        }
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         stateCheck();
         if (mCurrentNegotiation)
@@ -493,6 +499,7 @@ public:
 
 private:
     boost::shared_mutex mLock;
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     Ice::Identity mId;
     bool mShuttingDown;
@@ -703,6 +710,8 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
 {
     if (fail(status))
     {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        mMonitor.notify_all();
         //
         // TODO!
         //
@@ -733,9 +742,9 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
             pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
         }
     }
-    boost::lock_guard<boost::mutex> lock(mLock);
+    boost::unique_lock<boost::mutex> lock(mLock);
     mLocalAddress = fromInfo(info);
-    mMonitor.notify_one();
+    mMonitor.notify_all();
 }
 
 AddressPtr ICETransport::localAddress() 
@@ -745,9 +754,21 @@ AddressPtr ICETransport::localAddress()
     {
         return mLocalAddress;
     }
+    //
+    // Retry check for local address for max of 2.5 seconds, then proceed as if it was unknown.
+    //
     for (size_t i = 0; i < 5 && !mLocalAddress; ++i)
     {
-        mMonitor.wait(lock);
+        try
+        {
+            mMonitor.timed_wait(lock, boost::posix_time::milliseconds(500));
+        }
+        catch (const boost::thread_interrupted&)
+        {
+        }
+        catch (const boost::thread_resource_error&)
+        {
+        }
     }
     return mLocalAddress;
 }
@@ -796,6 +817,7 @@ void ICETransport::start()
     {
         throw InternalInitializationException("Unable to create new ICE media transport");
     }
+
     ICECallbackAdapter::addEntry(t, shared_from_this());
     mTransport = t;
     mCallback = callback;
diff --git a/src/UDPTLConfiguration.cpp b/src/UDPTLConfiguration.cpp
index 4fe9af5..ce0f987 100644
--- a/src/UDPTLConfiguration.cpp
+++ b/src/UDPTLConfiguration.cpp
@@ -21,7 +21,6 @@
 #include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
 #include "udptl.h"
 
@@ -49,12 +48,16 @@ public:
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationGroups(const Ice::Current&);
 
-    void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    void setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 
-    void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
-            const Ice::Current&);
-    void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
-            const Ice::Current&);
+    void removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+        const Ice::Current&);
+
+    void removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+        const Ice::Current&);
 
     int getStartPort();
     int getEndPort();
@@ -278,9 +281,12 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
     return groups;
 }
 
-void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(
+    const AsteriskSCF::System::V1::OperationContextPtr&,
+    const ConfigurationGroupSeq& groups,
     const Ice::Current&)
 {
+    // config's serial number already handles idempotency
     class GroupVisitor : public UdptlConfigurationGroupVisitor
     {
     public:
@@ -465,9 +471,10 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
     }
 }
 
-void ConfigurationServiceServant::removeConfigurationItems(
+void ConfigurationServiceServant::removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&, 
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
 {
+    // multiple removes would be naturally idempotent
     class GroupVisitor : public UdptlConfigurationGroupVisitor
     {
     public:
@@ -619,8 +626,10 @@ void ConfigurationServiceServant::removeConfigurationItems(
     }
 }
 
-void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationGroupSeq& groups, const Ice::Current&)
+void ConfigurationServiceServant::removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&, 
+    const ConfigurationGroupSeq& groups, const Ice::Current&)
 {
+    // multiple removes would be naturally idempotent
     class GroupVisitor : public UdptlConfigurationGroupVisitor
     {
     public:
diff --git a/src/UDPTLSession.cpp b/src/UDPTLSession.cpp
index b0203a5..6fb5934 100644
--- a/src/UDPTLSession.cpp
+++ b/src/UDPTLSession.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -33,18 +33,22 @@
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "udptl.h"
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
 using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::UDPTL;
+using namespace std;
 
 /**
  * Implementation of the UDPTLSession interface as defined in MediaUDPTLIf.ice
@@ -79,12 +83,12 @@ public:
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
     void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
-    void setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
+    void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::SessionCookies& cookies,
                     const Ice::Current&);
     void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
                           const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
                           const Ice::Current&);
-    void removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
+    void removeCookies(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::SessionCookies& cookies,
                        const Ice::Current&);
     void release(const Ice::Current&);
 
@@ -113,6 +117,8 @@ private:
 
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
      */
@@ -222,7 +228,8 @@ UDPTLSessionImpl::UDPTLSessionImpl(const Ice::ObjectAdapterPtr& adapter,
                                    const PJMediaEnvironmentPtr& env,
                                    const UDPTLServiceLocatorParamsPtr& params,
                                    const UdptlReplicationContextPtr& replicationContext,
-                                   const ConfigurationServiceImplPtr& configurationService) : 
+                                   const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mEnvironment(env),
     mEndpoint(PJMediaEndpoint::create(env)),
     mId(id),
@@ -264,6 +271,7 @@ UDPTLSessionImpl::UDPTLSessionImpl(const Ice::ObjectAdapterPtr& adapter,
                                    bool ipv6,
                                    const UdptlReplicationContextPtr& replicationContext,
                                    const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mEnvironment(env),
     mEndpoint(PJMediaEndpoint::create(env)),
     mId(sessionIdentity),
@@ -327,11 +335,17 @@ void UDPTLSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDic
 /**
  * Support for the corresponding API call.
  */
-void UDPTLSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
-                                  const Ice::Current&)
+void UDPTLSessionImpl::setCookies(const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const AsteriskSCF::Media::V1::SessionCookies& cookies,
+    const Ice::Current&)
 {
     { // scope the lock
         boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
         for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
              i != cookies.end();  ++i)
         {
@@ -376,11 +390,17 @@ void UDPTLSessionImpl::getCookies_async(
 /**
  * Implementation of the corresponding API call.
  */
-void UDPTLSessionImpl::removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies,
-                                   const Ice::Current&)
+void UDPTLSessionImpl::removeCookies(const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const AsteriskSCF::Media::V1::SessionCookies& cookies,
+    const Ice::Current&)
 {
     { // scope the lock
         boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
         for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
              i != cookies.end(); ++i)
         {
@@ -470,7 +490,7 @@ void UDPTLSessionImpl::replicateState(const UdptlSessionStateItemPtr& session, c
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->setState(items);
+    mReplicationContext->getReplicator().tryOneWay()->setState(AsteriskSCF::Operations::createContext(), items);
 }
 
 /**
@@ -507,7 +527,7 @@ void UDPTLSessionImpl::removeState(const UdptlSessionStateItemPtr& session, cons
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->removeState(items);
+    mReplicationContext->getReplicator().tryOneWay()->removeState(AsteriskSCF::Operations::createContext(), items);
 }
 
 UDPTLSessionPrx UDPTLSessionImpl::activate(const string& id)
diff --git a/src/UDPTLSink.cpp b/src/UDPTLSink.cpp
index 319e530..84bb021 100644
--- a/src/UDPTLSink.cpp
+++ b/src/UDPTLSink.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -20,22 +20,26 @@
 #include <pjlib.h>
 #include <pjmedia.h>
 
+#include <boost/thread.hpp>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "udptl.h"
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::UDPTL::V1;
+using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Network::V1;
-using namespace AsteriskSCF::UDPTL;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
 
 /**
  * Private implementation details for the StreamSinkUDPTLImpl class.
@@ -46,11 +50,15 @@ public:
     /**
      * Constructor for our StreamSinkUDPTLImplPriv class.
      */
-    StreamSinkUDPTLImplPriv(const SessionAdapterPtr& sessionAdapter, 
+    StreamSinkUDPTLImplPriv(const SessionAdapterPtr& sessionAdapter,
                             const PJMediaTransportPtr& transport,
                             const std::string&,
                             struct udptl *udptl);
 
+    boost::shared_mutex mMutex;
+
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * A pointer to the UDPTL session we are associated with.
      */
@@ -80,13 +88,14 @@ public:
 /**
  * Constructor for the StreamSinkUDPTLImplPriv class.
  */
-StreamSinkUDPTLImplPriv::StreamSinkUDPTLImplPriv(const SessionAdapterPtr& session, 
+StreamSinkUDPTLImplPriv::StreamSinkUDPTLImplPriv(const SessionAdapterPtr& session,
                                                  const PJMediaTransportPtr& transport,
                                                  const string& sessionId,
                                                  struct udptl *udptl) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mSessionAdapter(session),
     mTransport(transport),
-    mSinkStateItem(new UdptlStreamSinkStateItem), 
+    mSinkStateItem(new UdptlStreamSinkStateItem),
     mSessionId(sessionId),
     mUdptl(udptl)
 {
@@ -111,6 +120,7 @@ StreamSinkUDPTLImpl::StreamSinkUDPTLImpl(const SessionAdapterPtr& session,
  */
 void StreamSinkUDPTLImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     // Don't even bother if no remote address information is present
     if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
     {
@@ -139,8 +149,10 @@ void StreamSinkUDPTLImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames,
 /**
  * Implementation of the setSource method as defined in MediaIf.ice
  */
-void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
 {
+    // multiple sets naturally idempotent
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = source;
 
     mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -151,6 +163,7 @@ void StreamSinkUDPTLImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePr
  */
 AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkUDPTLImpl::getSource(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem->source;
 }
 
@@ -159,6 +172,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkUDPTLImpl::getSource(const Ice
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSinkUDPTLImpl::getFormats(const Ice::Current&)
 {
+    // no access to local state - no need to lock
     FormatSeq formats;
     return formats;
 }
@@ -168,6 +182,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkUDPTLImpl::getFormats(const Ice::Cur
  */
 std::string StreamSinkUDPTLImpl::getId(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     /* For now utilize the id of the session */
     return mImpl->mSessionId;
 }
@@ -175,8 +190,17 @@ std::string StreamSinkUDPTLImpl::getId(const Ice::Current&)
 /**
  * Implementation of the setRemoteDetails method as defined in NetworkIf.ice
  */
-void StreamSinkUDPTLImpl::setRemoteDetails(const string& address, Ice::Int port, const Ice::Current&)
+void StreamSinkUDPTLImpl::setRemoteDetails(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const string& address, Ice::Int port,
+    const Ice::Current&)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     mImpl->mSessionAdapter->setRemoteDetails(address, port);
 
     mImpl->mSinkStateItem->remoteAddress = address;
@@ -190,14 +214,21 @@ void StreamSinkUDPTLImpl::setRemoteDetails(const string& address, Ice::Int port,
  */
 AddressInformation StreamSinkUDPTLImpl::getRemoteDetails(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return AddressInformation(mImpl->mSinkStateItem->remoteAddress, mImpl->mSinkStateItem->remotePort);
 }
 
 /**
  * Implemnentation of the setFarMaxDatagram method as defined in MediaUDPTLIf.ice
  */
-void StreamSinkUDPTLImpl::setFarMaxDatagram(int datagram, const Ice::Current&)
+void StreamSinkUDPTLImpl::setFarMaxDatagram(const AsteriskSCF::System::V1::OperationContextPtr& context, int datagram, const Ice::Current&)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     udptl_set_far_max_datagram(mImpl->mUdptl, datagram);
 
     mImpl->mSinkStateItem->farMaxDatagram = datagram;
@@ -209,6 +240,7 @@ void StreamSinkUDPTLImpl::setFarMaxDatagram(int datagram, const Ice::Current&)
  */
 Ice::Int StreamSinkUDPTLImpl::getFarMaxDatagram(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return udptl_get_far_max_datagram(mImpl->mUdptl);
 }
 
@@ -217,14 +249,22 @@ Ice::Int StreamSinkUDPTLImpl::getFarMaxDatagram(const Ice::Current&)
  */
 Ice::Int StreamSinkUDPTLImpl::getFarMaxIFP(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return udptl_get_far_max_ifp(mImpl->mUdptl);
 }
 
 /**
  * Implemnentation of the setErrorCorrectionScheme method as defined in MediaUDPTLIf.ice
  */
-void StreamSinkUDPTLImpl::setErrorCorrectionScheme(ErrorCorrectionScheme scheme, const Ice::Current&)
+void StreamSinkUDPTLImpl::setErrorCorrectionScheme(
+    const AsteriskSCF::System::V1::OperationContextPtr& context, ErrorCorrectionScheme scheme, const Ice::Current&)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
     enum t38_ec_modes mode;
 
     if (scheme == FEC)
@@ -251,6 +291,7 @@ void StreamSinkUDPTLImpl::setErrorCorrectionScheme(ErrorCorrectionScheme scheme,
  */
 ErrorCorrectionScheme StreamSinkUDPTLImpl::getErrorCorrectionScheme(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     enum t38_ec_modes mode = udptl_get_error_correction_scheme(mImpl->mUdptl);
 
     if (mode == UDPTL_ERROR_CORRECTION_FEC)
@@ -272,27 +313,32 @@ ErrorCorrectionScheme StreamSinkUDPTLImpl::getErrorCorrectionScheme(const Ice::C
  */
 UdptlStreamSinkStateItemPtr StreamSinkUDPTLImpl::getStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem;
 }
 
 void StreamSinkUDPTLImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->remoteAddress = host;
     mImpl->mSinkStateItem->remotePort = port;
 }
 
 void StreamSinkUDPTLImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = proxy;
 }
 
 void StreamSinkUDPTLImpl::setFarMaxDatagramImpl(int datagram)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     udptl_set_far_max_datagram(mImpl->mUdptl, datagram);
 }
 
 void StreamSinkUDPTLImpl::setErrorCorrectionSchemeImpl(ErrorCorrectionScheme scheme)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     enum t38_ec_modes mode;
 
     if (scheme == FEC)
@@ -309,4 +355,4 @@ void StreamSinkUDPTLImpl::setErrorCorrectionSchemeImpl(ErrorCorrectionScheme sch
     }
 
     udptl_set_error_correction_scheme(mImpl->mUdptl, mode);
-}    
+}
diff --git a/src/UDPTLSink.h b/src/UDPTLSink.h
index 4357106..7bee459 100644
--- a/src/UDPTLSink.h
+++ b/src/UDPTLSink.h
@@ -35,17 +35,17 @@ public:
      * AsteriskSCF::Media::UDPTL::V1::StreamSinkUDPTL implementation.
      */
     void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
-    void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
+    void setSource(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
     std::string getId(const Ice::Current&);
-    void setRemoteDetails(const std::string&, Ice::Int, const Ice::Current&);
+    void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string&, Ice::Int, const Ice::Current&);
     AsteriskSCF::Network::V1::AddressInformation getRemoteDetails(const Ice::Current&);
 
-    void setFarMaxDatagram(Ice::Int, const Ice::Current&);
+    void setFarMaxDatagram(const AsteriskSCF::System::V1::OperationContextPtr&, Ice::Int, const Ice::Current&);
     Ice::Int getFarMaxDatagram(const Ice::Current&);
     Ice::Int getFarMaxIFP(const Ice::Current&);
-    void setErrorCorrectionScheme(AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme, const Ice::Current&);
+    void setErrorCorrectionScheme(const AsteriskSCF::System::V1::OperationContextPtr&, AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme, const Ice::Current&);
     AsteriskSCF::Media::UDPTL::V1::ErrorCorrectionScheme getErrorCorrectionScheme(const Ice::Current&);
 
     /**
diff --git a/src/UDPTLSource.cpp b/src/UDPTLSource.cpp
index 44ce10c..a1961bc 100644
--- a/src/UDPTLSource.cpp
+++ b/src/UDPTLSource.cpp
@@ -121,8 +121,9 @@ StreamSourceUDPTLImpl::StreamSourceUDPTLImpl(const SessionAdapterPtr& session,
 /**
  * Implementation of the addSink method as defined in MediaIf.ice
  */
-void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
+    // naturally idempotent - no extra retry logic needed
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
     // Do not allow the same sink to be added multiple times
@@ -142,8 +143,9 @@ void StreamSourceUDPTLImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&
 /**
  * Implementation of the removeSink method as defined in MediaIf.ice
  */
-void StreamSourceUDPTLImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceUDPTLImpl::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
+    // naturally idempotent - no extra retry logic needed
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
     mImpl->mSourceStateItem->sinks.erase(std::remove(mImpl->mSourceStateItem->sinks.begin(),
@@ -182,8 +184,9 @@ std::string StreamSourceUDPTLImpl::getId(const Ice::Current&)
 /**
  * Implementation of the requestMethod method as defined in MediaIf.ice
  */
-void StreamSourceUDPTLImpl::requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
+void StreamSourceUDPTLImpl::requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
 {
+    // naturally idempotent - no extra retry logic needed
     // We do not currently support switching formats. 
     throw MediaFormatSwitchException();
 }
diff --git a/src/UDPTLSource.h b/src/UDPTLSource.h
index fede4a7..f17c98d 100644
--- a/src/UDPTLSource.h
+++ b/src/UDPTLSource.h
@@ -31,12 +31,12 @@ public:
                           const std::string& parentSessionId,
                           struct udptl *udptl);
 
-    void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
-    void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    void addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    void removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
     std::string getId(const Ice::Current&);
-    void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
+    void requestFormat(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
 
     AsteriskSCF::Network::V1::AddressInformation getLocalDetails(const Ice::Current&);
 
diff --git a/src/UdptlStateReplicator.h b/src/UdptlStateReplicator.h
index 845572a..cf665fb 100644
--- a/src/UdptlStateReplicator.h
+++ b/src/UdptlStateReplicator.h
@@ -47,8 +47,8 @@ public:
                                   const UdptlReplicationContextPtr& replicationContext,
                                   const ConfigurationServiceImplPtr&);
     ~UdptlStateReplicatorListenerI();
-    void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
-    void stateSet(const AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemSeq&, const Ice::Current&);
+    void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+    void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemSeq&, const Ice::Current&);
     bool operator==(const UdptlStateReplicatorListenerI &rhs);
 private:
     boost::shared_ptr<UdptlStateReplicatorListenerImpl> mImpl;
diff --git a/src/UdptlStateReplicatorListener.cpp b/src/UdptlStateReplicatorListener.cpp
index 5c34b68..82aed82 100644
--- a/src/UdptlStateReplicatorListener.cpp
+++ b/src/UdptlStateReplicatorListener.cpp
@@ -20,15 +20,18 @@
 #include <boost/shared_ptr.hpp>
 
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "UdptlStateReplicator.h"
 #include "ReplicationAdapter.h"
 #include "UDPTLSession.h"
 
-using namespace std;
 using namespace AsteriskSCF::Media::UDPTL::V1;
-using namespace AsteriskSCF::UDPTL;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::UDPTL;
+using namespace std;
 
 class UdptlStateReplicatorItem
 {
@@ -36,7 +39,19 @@ public:
 
     ~UdptlStateReplicatorItem()
     {
-        mSession->destroy();
+        if (mSession)
+        {
+            try
+            {
+                mSession->destroy();
+            }
+            catch (...)
+            {
+                //
+                // Can't do much in a destructor!
+                //
+            }
+        }
     }
 
     //
@@ -61,93 +76,108 @@ struct UdptlStateReplicatorListenerImpl
 {
 public:
     UdptlStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter,
-                                     const PJMediaEnvironmentPtr& env, 
+                                     const PJMediaEnvironmentPtr& env,
                                      const UdptlGeneralStateItemPtr& generalState,
                                      const UdptlReplicationContextPtr& replicationContext,
                                      const ConfigurationServiceImplPtr& configurationService)
-        : mId(IceUtil::generateUUID()), 
-          mAdapter(adapter), 
+        : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+          mId(IceUtil::generateUUID()),
+          mAdapter(adapter),
           mEnvironment(env),
           mGeneralState(generalState),
           mReplicationContext(replicationContext),
           mConfigurationService(configurationService) {}
-    
-    void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
+
+    void removeStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::StringSeq& itemKeys)
     {
+        boost::mutex::scoped_lock lock(mMutex);
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
         for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
         {
             // Just erasing this from the map will cause the destructor to actually shut things down
             mStateItems.erase((*key));
         }
     }
-    
-    void setStateNoticeImpl(const UdptlStateItemSeq& items)
-    {
-    class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
-    {
-    public:
-            visitor(UdptlStateReplicatorListenerImpl *impl) : mImpl(impl)
-        {
-        }
 
-    private:
-        UdptlStateReplicatorListenerImpl *mImpl;
+    void setStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const UdptlStateItemSeq& items)
+    {
+        boost::mutex::scoped_lock lock(mMutex);
 
-        void visitUdptlGeneralStateItem(const UdptlGeneralStateItemPtr &item)
+        if (!mOperationContextCache->addOperationContext(context))
         {
-            mImpl->mGeneralState->serviceManagement = item->serviceManagement;
+            // retry detected
+            return;
         }
-            
-        void visitUdptlSessionStateItem(const UdptlSessionStateItemPtr &item)
-        {
-            map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
-            boost::shared_ptr<UdptlStateReplicatorItem> localitem;
 
-            if (i == mImpl->mStateItems.end())
+        class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
+        {
+        public:
+            visitor(UdptlStateReplicatorListenerImpl *impl) : mImpl(impl)
             {
-                boost::shared_ptr<UdptlStateReplicatorItem> newitem(new UdptlStateReplicatorItem());
-                localitem = newitem;
-                mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
-
-                localitem->setSession(
-                    AsteriskSCF::UDPTL::UDPTLSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
-                                                             mImpl->mReplicationContext, mImpl->mConfigurationService));
             }
-            else
+
+        private:
+            UdptlStateReplicatorListenerImpl *mImpl;
+
+            void visitUdptlGeneralStateItem(const UdptlGeneralStateItemPtr &item)
             {
-                localitem = i->second;
+                mImpl->mGeneralState->serviceManagement = item->serviceManagement;
             }
-        
-            //
-            // TODO: This appears to happen in testing on occasion. Should verify if this should be
-            // expected.
-            //
-            if (localitem)
+
+            void visitUdptlSessionStateItem(const UdptlSessionStateItemPtr &item)
             {
-                localitem->getSession()->update(item);
+                map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
+                boost::shared_ptr<UdptlStateReplicatorItem> localitem;
+
+                if (i == mImpl->mStateItems.end())
+                {
+                    boost::shared_ptr<UdptlStateReplicatorItem> newitem(new UdptlStateReplicatorItem());
+                    localitem = newitem;
+                    mImpl->mStateItems.insert(make_pair(item->sessionId, newitem));
+
+                    localitem->setSession(
+                        AsteriskSCF::UDPTL::UDPTLSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
+                            mImpl->mReplicationContext, mImpl->mConfigurationService));
+                }
+                else
+                {
+                    localitem = i->second;
+                }
+
+                //
+                // TODO: This appears to happen in testing on occasion. Should verify if this should be
+                // expected.
+                //
+                if (localitem)
+                {
+                    localitem->getSession()->update(item);
+                }
             }
-        }
-            
-        void visitUdptlStreamSinkStateItem(const UdptlStreamSinkStateItemPtr &item)
-        {
-            map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
-                        mImpl->mStateItems.find(item->sessionId);
-            if (i != mImpl->mStateItems.end())
+
+            void visitUdptlStreamSinkStateItem(const UdptlStreamSinkStateItemPtr &item)
             {
-                i->second->getSession()->update(item);
-            }
-        }
-            
-        void visitUdptlStreamSourceStateItem(const UdptlStreamSourceStateItemPtr &item)
-        {
-            map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
+                map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
                     mImpl->mStateItems.find(item->sessionId);
-            if (i != mImpl->mStateItems.end())
+                if (i != mImpl->mStateItems.end())
+                {
+                    i->second->getSession()->update(item);
+                }
+            }
+
+            void visitUdptlStreamSourceStateItem(const UdptlStreamSourceStateItemPtr &item)
             {
-                i->second->getSession()->update(item);
+                map<string, boost::shared_ptr<UdptlStateReplicatorItem> >::iterator i =
+                    mImpl->mStateItems.find(item->sessionId);
+                if (i != mImpl->mStateItems.end())
+                {
+                    i->second->getSession()->update(item);
+                }
             }
-        }
-    };
+        };
 
         AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitorPtr v = new visitor(this);
 
@@ -157,7 +187,9 @@ public:
         }
     }
 
-    string mId;
+    boost::mutex mMutex;
+    OperationContextCachePtr mOperationContextCache;
+    const string mId;
     map<string, boost::shared_ptr<UdptlStateReplicatorItem> > mStateItems;
     Ice::ObjectAdapterPtr mAdapter;
     PJMediaEnvironmentPtr mEnvironment;
@@ -177,17 +209,18 @@ UdptlStateReplicatorListenerI::~UdptlStateReplicatorListenerI()
 {
 }
 
-void UdptlStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+void UdptlStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::StringSeq& itemKeys, const Ice::Current&)
 {
-    mImpl->removeStateNoticeImpl(itemKeys);
+    mImpl->removeStateNoticeImpl(context, itemKeys);
 }
 
-void UdptlStateReplicatorListenerI::stateSet(const UdptlStateItemSeq& items, const Ice::Current&)
+void UdptlStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const UdptlStateItemSeq& items, const Ice::Current&)
 {
-    mImpl->setStateNoticeImpl(items);
+    mImpl->setStateNoticeImpl(context, items);
 }
 
 bool UdptlStateReplicatorListenerI::operator==(const UdptlStateReplicatorListenerI &rhs)
 {
+    // reading immutable variables; no lock needed
     return mImpl->mId == rhs.mImpl->mId;
 }
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 5644864..12a6e45 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -5,6 +5,7 @@ astscf_component_add_slices(MediaTransportUDPTLTest PROJECT AsteriskSCF/Replicat
 astscf_component_add_boost_libraries(MediaTransportUDPTLTest unit_test_framework thread date_time)
 astscf_component_add_slice_collection_libraries(MediaTransportUDPTLTest ASTSCF)
 astscf_component_build_icebox(MediaTransportUDPTLTest)
+target_link_libraries(MediaTransportUDPTLTest ASTSCFIceUtilCpp)
 
 astscf_component_init(MediaTransportUDPTLTestV6)
 astscf_component_add_files(MediaTransportUDPTLTestV6 TestMediaTransportUDPTL.cpp)
@@ -13,6 +14,7 @@ astscf_component_add_boost_libraries(MediaTransportUDPTLTestV6 unit_test_framewo
 astscf_component_add_slice_collection_libraries(MediaTransportUDPTLTestV6 ASTSCF)
 astscf_component_build_icebox(MediaTransportUDPTLTestV6)
 set_property(TARGET MediaTransportUDPTLTestV6 PROPERTY COMPILE_DEFINITIONS IPV6Test)
+target_link_libraries(MediaTransportUDPTLTestV6 ASTSCFIceUtilCpp)
 
 astscf_component_init(MediaTransportUDPTLIceTest)
 astscf_component_add_files(MediaTransportUDPTLIceTest TestUDPTLICE.cpp)
@@ -21,6 +23,8 @@ astscf_component_add_slices(MediaTransportUDPTLIceTest PROJECT AsteriskSCF/Confi
 astscf_component_add_boost_libraries(MediaTransportUDPTLIceTest unit_test_framework thread date_time)
 astscf_component_add_slice_collection_libraries(MediaTransportUDPTLIceTest ASTSCF)
 astscf_component_build_icebox(MediaTransportUDPTLIceTest)
+target_link_libraries(MediaTransportUDPTLIceTest ASTSCFIceUtilCpp)
+
 pjproject_link(MediaTransportUDPTLIceTest pjlib)
 
 astscf_test_icebox(MediaTransportUDPTLTest config/test_component.config)
diff --git a/test/TestMediaTransportUDPTL.cpp b/test/TestMediaTransportUDPTL.cpp
index 4e6bba9..ba58567 100644
--- a/test/TestMediaTransportUDPTL.cpp
+++ b/test/TestMediaTransportUDPTL.cpp
@@ -35,6 +35,8 @@
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
 #include <AsteriskSCF/Media/NetworkIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 
 #include "UdptlStateReplicationIf.h"
 
@@ -45,6 +47,7 @@ using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::UDPTL::V1;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
 using namespace AsteriskSCF::Network::V1;
+using namespace AsteriskSCF::System::Component::V1;
 
 /**
  * Test service, for loading into icebox
@@ -70,8 +73,8 @@ static ArgCacheType mCachedArgs;
 class TestUdptlReplicatorListener : public UdptlStateReplicatorListener
 {
 public:
-    void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
-    void stateSet(const UdptlStateItemSeq&, const Ice::Current&);
+    void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&);
+    void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const UdptlStateItemSeq&, const Ice::Current&);
     
     UdptlGeneralStateItemPtr mGeneral;
     UdptlSessionStateItemPtr mSession;
@@ -145,11 +148,13 @@ public:
 };
 static SharedTestData Testbed;
 
-void TestUdptlReplicatorListener::stateRemoved(const Ice::StringSeq&, const Ice::Current&)
+void TestUdptlReplicatorListener::stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, 
+    const Ice::StringSeq&, const Ice::Current&)
 {
 }
 
-void TestUdptlReplicatorListener::stateSet(const UdptlStateItemSeq& items, const Ice::Current&)
+void TestUdptlReplicatorListener::stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, 
+    const UdptlStateItemSeq& items, const Ice::Current&)
 {
     class visitor : public AsteriskSCF::Replication::UDPTL::V1::UdptlStateItemVisitor
     {
@@ -215,7 +220,8 @@ public:
     /**
      * Implementation of the setSource method which just stores the source away for later retrieval.
      */
-    void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+    void setSource(const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
     {
         mSource = source;
     }
@@ -283,9 +289,27 @@ struct GlobalIceFixture
 
             Testbed.locator = ServiceLocatorPrx::checkedCast(Testbed.communicator->stringToProxy("LocatorService:tcp -h 127.0.0.1 -p 4411"));
 
-            if (!Testbed.locator) {
+            if (!Testbed.locator) 
+            {
                 throw "Invalid service locator proxy";
             }
+
+            ServiceLocatorParamsPtr primaryReplicaParams(new ServiceLocatorParams);
+            primaryReplicaParams->category = "MediaUDPTLService.Replica";
+            primaryReplicaParams->service = "default";
+            primaryReplicaParams->id = "MediaTransportUDPTL";
+
+            ReplicaPrx mPrimaryReplica;
+            try
+            {
+                mPrimaryReplica = ReplicaPrx::uncheckedCast(Testbed.locator->locate(primaryReplicaParams));
+            }
+            catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+            {
+                throw;
+            }
+
+            mPrimaryReplica->activate(AsteriskSCF::Operations::createContext());
         }
         catch (const Ice::Exception& ex)
         {
@@ -378,7 +402,7 @@ BOOST_AUTO_TEST_CASE(AddListenertoStateReplicator)
 
     try
     {
-       Testbed.mStateReplicator->addListener(Testbed.mListenerProxy);
+       Testbed.mStateReplicator->addListener(AsteriskSCF::Operations::createContext(), Testbed.mListenerProxy);
 
        added = true;
     }
@@ -424,7 +448,7 @@ BOOST_AUTO_TEST_CASE(AllocateUDPTLSession)
         params->category = "udptl";
         params->service = "";
 #ifdef IPV6_TEST
-    params->ipv6 = true;
+        params->ipv6 = true;
 #endif
 
         UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
@@ -432,7 +456,7 @@ BOOST_AUTO_TEST_CASE(AllocateUDPTLSession)
         // You might think "geez, this should deadlock due to state replication" but no, we use one ways for that
         boost::mutex::scoped_lock lock(Testbed.mLock);
 
-        Testbed.session = service->allocate(params);
+        Testbed.session = service->allocate(AsteriskSCF::Operations::createContext(), params);
 
         // Give the RTP component time to replicate this session
         Testbed.mCondition.wait(lock);
@@ -699,11 +723,11 @@ BOOST_AUTO_TEST_CASE(ConfirmRemoteUDPTLAddressSetting)
             StreamSinkUDPTLPrx sink = StreamSinkUDPTLPrx::checkedCast((*i));
 
 #ifdef IPV6_TEST
-        std::string address = "::1";
+            std::string address = "::1";
 #else
-        std::string address = "127.0.0.1";
+            std::string address = "127.0.0.1";
 #endif
-            sink->setRemoteDetails(address, 10000);
+            sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), address, 10000);
 
             AddressInformation info = sink->getRemoteDetails();
             if (info.ipAddress == address && info.port == 10000)
@@ -739,7 +763,7 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
         {
             StreamSourceUDPTLPrx source = StreamSourceUDPTLPrx::checkedCast((*i));
 
-            source->addSink(Testbed.sink);
+            source->addSink(AsteriskSCF::Operations::createContext(), Testbed.sink);
 
             StreamSinkPrx sink = source->getSinks().front();
 
@@ -783,7 +807,7 @@ BOOST_AUTO_TEST_CASE(SetupUDPTLLoopback)
 
         boost::mutex::scoped_lock lock(Testbed.mLock);
     
-        sink->setRemoteDetails(info.ipAddress, info.port);
+        sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), info.ipAddress, info.port);
 
         looped = true;
 
@@ -815,7 +839,7 @@ BOOST_AUTO_TEST_CASE(SetFarMaxDatagram)
 
         boost::mutex::scoped_lock lock(Testbed.mLock);
 
-        sink->setFarMaxDatagram(1000);
+        sink->setFarMaxDatagram(AsteriskSCF::Operations::createContext(), 1000);
 
         set = true;
 
@@ -847,7 +871,7 @@ BOOST_AUTO_TEST_CASE(SetErrorCorrectionScheme)
 
         boost::mutex::scoped_lock lock(Testbed.mLock);
 
-        sink->setErrorCorrectionScheme(AsteriskSCF::Media::UDPTL::V1::REDUNDANCY);
+        sink->setErrorCorrectionScheme(AsteriskSCF::Operations::createContext(), AsteriskSCF::Media::UDPTL::V1::REDUNDANCY);
 
         set = true;
 
@@ -904,16 +928,16 @@ BOOST_AUTO_TEST_CASE(SetInvalidAddressFamilyAddress)
         {
             StreamSinkUDPTLPrx sink = StreamSinkUDPTLPrx::checkedCast((*i));
 
-        /* No, these are not accidentally reversed. We want to try to set an IPv4 address
-         * on an IPv6 sink and vice versa to make sure that an exception does get thrown.
-         */
+            /* No, these are not accidentally reversed. We want to try to set an IPv4 address
+             * on an IPv6 sink and vice versa to make sure that an exception does get thrown.
+             */
 #ifdef IPV6_TEST
-            sink->setRemoteDetails("127.0.0.1", 10000);
+            sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), "127.0.0.1", 10000);
 #else
-        sink->setRemoteDetails("::1", 10000);
+            sink->setRemoteDetails(AsteriskSCF::Operations::createContext(), "::1", 10000);
 #endif
 
-        set = true;
+            set = true;
         }
     }
     catch (const Ice::Exception &e)
@@ -1013,6 +1037,77 @@ BOOST_AUTO_TEST_CASE(ReleaseUDPTLSession)
     BOOST_CHECK(released);
 }
 
+/**
+ * Ensure that multiple calls to allocate return different proxies.
+ */
+BOOST_AUTO_TEST_CASE(AllocateDoubleCalls)
+{
+    UDPTLServiceLocatorParamsPtr params = new UDPTLServiceLocatorParams();
+    params->category = "udptl";
+    params->service = "";
+
+    UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+
+    UDPTLSessionPrx expected = service->allocate(AsteriskSCF::Operations::createContext(), params);
+    UDPTLSessionPrx actual = service->allocate(AsteriskSCF::Operations::createContext(), params);
+
+    BOOST_REQUIRE_NE(expected, actual);
+}
+
+/**
+ * Test retry logic in ::allocate
+ */
+BOOST_AUTO_TEST_CASE(AllocateRetry)
+{
+    UDPTLServiceLocatorParamsPtr params = new UDPTLServiceLocatorParams();
+    params->category = "udptl";
+    params->service = "";
+
+    UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+    UDPTLSessionPrx expected = service->allocate(context, params);
+    UDPTLSessionPrx actual = service->allocate(context, params);
+
+    BOOST_REQUIRE_EQUAL(expected, actual);
+}
+
+/**
+ * Test retry logic in ::allocate, when it throws an exception
+ */
+BOOST_AUTO_TEST_CASE(ThrowingAllocateRetry)
+{
+    UDPTLOverICEServiceLocatorParamsPtr params = new UDPTLOverICEServiceLocatorParams();
+    params->category = "udptl";
+    params->service = "";
+
+    UDPTLMediaServicePrx service = UDPTLMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
+    AsteriskSCF::System::V1::OperationContextPtr context = AsteriskSCF::Operations::createContext();
+
+    // allocating for ICE when ICE isn't enabled should throw an exception
+    params->enableICE = true;
+
+    try
+    {
+        service->allocate(context, params);
+        BOOST_FAIL("Expected allocation to throw an exception");
+    }
+    catch (const SessionAllocationFailure&)
+    {
+        // expected
+    }
+
+    try
+    {
+        service->allocate(context, params);
+        BOOST_FAIL("Expected allocation to throw an exception");
+    }
+    catch (const SessionAllocationFailure&)
+    {
+        // expected
+    }
+}
+
 void UDPTLTest::start(std::string const& name,
     Ice::CommunicatorPtr const&,
     Ice::StringSeq const& args)
diff --git a/test/TestUDPTLICE.cpp b/test/TestUDPTLICE.cpp
index 546d9a3..596f5bf 100644
--- a/test/TestUDPTLICE.cpp
+++ b/test/TestUDPTLICE.cpp
@@ -29,10 +29,12 @@
 #include <boost/thread/condition.hpp>
 #include <Ice/Ice.h>
 #include <Ice/BuiltinSequences.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/UDPTL/MediaUDPTLIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 
 //
 // An attempt to get some reasonable code coverage and verify that the basic *premise* of the functionality works as
@@ -48,6 +50,8 @@ using namespace AsteriskSCF::Media::UDPTL::V1;
 using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::Replication::UDPTL::V1;
 using namespace AsteriskSCF::Configuration::UDPTL::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::Core::Discovery::V1;
 
 namespace
 {
@@ -55,11 +59,11 @@ namespace
 class TestReplicatorListener : public UdptlStateReplicatorListener
 {
 public:
-    void stateRemoved(const Ice::StringSeq&, const Ice::Current&)
+    void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr&, const Ice::StringSeq&, const Ice::Current&)
     {
     }
 
-    void stateSet(const UdptlStateItemSeq&, const Ice::Current&)
+    void stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const UdptlStateItemSeq&, const Ice::Current&)
     {
     }
     
@@ -114,6 +118,32 @@ public:
         BOOST_TEST_MESSAGE("Creating test fixture");
         ::boost::debug::detect_memory_leaks(false);
         ::boost::unit_test::unit_test_log.set_stream(cout);
+
+        IceEnvironment iceEnv;
+
+        ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(iceEnv.getCommunicator()->stringToProxy("LocatorService:tcp -h 127.0.0.1 -p 4411"));
+
+        if (!locator) 
+        {
+            throw "Invalid service locator proxy";
+        }
+
+        ServiceLocatorParamsPtr primaryReplicaParams(new ServiceLocatorParams);
+        primaryReplicaParams->category = "MediaUDPTLService.Replica";
+        primaryReplicaParams->service = "default";
+        primaryReplicaParams->id = "MediaTransportUDPTL";
+
+        ReplicaPrx mPrimaryReplica;
+        try
+        {
+            mPrimaryReplica = ReplicaPrx::uncheckedCast(locator->locate(primaryReplicaParams));
+        }
+        catch(const AsteriskSCF::Core::Discovery::V1::ServiceNotFound&)
+        {
+            throw;
+        }
+
+        mPrimaryReplica->activate(AsteriskSCF::Operations::createContext());
     }
 
     ~TestFixture()
@@ -137,7 +167,22 @@ ConfigurationServicePrx locateConfigurationService(const ServiceLocatorPrx& loca
 {
     ServiceLocatorParamsPtr query = new ServiceLocatorParams;
     query->category = ConfigurationDiscoveryCategory;
-    return ConfigurationServicePrx::uncheckedCast(locator->locate(query));
+    // We need to synchronously setConfiguration, but the locate() returns the proxy to the
+    // configuration IceStorm topic, which is asynchronous. Instead, locateAll() and find the
+    // real ConfigurationServicePrx.
+    Ice::ObjectProxySeq objs = locator->locateAll(query);
+    for (Ice::ObjectProxySeq::iterator obj = objs.begin(); obj != objs.end(); ++obj)
+    {
+        try
+        {
+            return ConfigurationServicePrx::checkedCast(*obj);
+        }
+        catch (...)
+        {
+            // must be the IceStorm topic. try again
+        }
+    }
+    return 0;
 }
 
 ServiceLocatorPrx getLocator(const Ice::CommunicatorPtr& comm)
@@ -177,7 +222,7 @@ BOOST_AUTO_TEST_CASE(UdptlSessionWithICEEnabled)
 
         ConfigurationGroupSeq s;
         s.push_back(iceGroup);
-        configPrx->setConfiguration(s);
+        configPrx->setConfiguration(AsteriskSCF::Operations::createContext(), s);
 
         UDPTLMediaServicePrx servicePrx;
         {
@@ -195,7 +240,7 @@ BOOST_AUTO_TEST_CASE(UdptlSessionWithICEEnabled)
             query->enableICE = true;
             query->enableTURN = false;
             UDPTLSessionPrx sessionPrx;
-            BOOST_REQUIRE_NO_THROW(sessionPrx = servicePrx->allocate(query));
+            BOOST_REQUIRE_NO_THROW(sessionPrx = servicePrx->allocate(AsteriskSCF::Operations::createContext(), query));
             BOOST_REQUIRE(sessionPrx != 0);
             sessionPrx->ice_ping(); // To silence unused arg warning.
             sessionPrx->release();
@@ -240,14 +285,14 @@ BOOST_AUTO_TEST_CASE(UdptlSessionAllocationFailure)
         iceGroup->configurationItems[UDPTLICETransportFlagsItemName] = iceFlags;
         ConfigurationGroupSeq s;
         s.push_back(iceGroup);
-        BOOST_REQUIRE_NO_THROW(configPrx->setConfiguration(s));
+        BOOST_REQUIRE_NO_THROW(configPrx->setConfiguration(AsteriskSCF::Operations::createContext(), s));
         try
         {
             UDPTLOverICEServiceLocatorParamsPtr query = new UDPTLOverICEServiceLocatorParams;
             query->category = "udptl";
             query->enableICE = true;
             query->enableTURN = true;
-            UDPTLSessionPrx sessionPrx = servicePrx->allocate(query);
+            UDPTLSessionPrx sessionPrx = servicePrx->allocate(AsteriskSCF::Operations::createContext(), query);
             sessionPrx->ice_ping();
         }
         catch (const SessionAllocationFailure& ex)

-----------------------------------------------------------------------


-- 
asterisk-scf/release/media_transport_udptl.git



More information about the asterisk-scf-commits mailing list