[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 8 17:28:02 CDT 2012


branch "master" has been updated
       via  29b69ef13c04cb6aae14e39911edae18c3f54f8f (commit)
      from  69ac9b67a06a178901a56686b0d28028d5d98194 (commit)

Summary of changes:
 config/RTP.config                                  |    2 +-
 config/test_component.conf                         |    4 +
 config/test_component_v6.conf                      |    5 +
 config/test_rtp_ice.conf                           |   11 +-
 .../MediaRTPPJMEDIA/RTPStateReplicationIf.ice      |   13 +-
 src/Component.cpp                                  |  118 +++++--
 src/ICETransport.cpp                               |   97 ++++--
 src/RTPConfiguration.cpp                           |  136 +++++---
 src/RTPSession.cpp                                 |  377 ++++++++++++++------
 src/RTPSink.cpp                                    |   99 ++++--
 src/RTPSink.h                                      |    6 +-
 src/RTPSource.cpp                                  |   44 ++-
 src/RTPSource.h                                    |    7 +-
 src/RTPStateReplicator.h                           |    4 +-
 src/RTPStateReplicatorListener.cpp                 |  112 +++++--
 src/RTPTelephonyEventSink.cpp                      |   38 ++-
 src/RTPTelephonyEventSink.h                        |    7 +-
 src/RTPTelephonyEventSource.cpp                    |    7 +-
 src/RTPTelephonyEventSource.h                      |    2 +
 test/CMakeLists.txt                                |    3 +
 test/TestRTPICE.cpp                                |   43 ++-
 test/TestRTPpjmedia.cpp                            |  182 ++++++++--
 22 files changed, 950 insertions(+), 367 deletions(-)


- Log -----------------------------------------------------------------
commit 29b69ef13c04cb6aae14e39911edae18c3f54f8f
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue May 8 11:40:51 2012 -0500

    Changes for new retry logic.

diff --git a/config/RTP.config b/config/RTP.config
index 47b975d..30e0289 100644
--- a/config/RTP.config
+++ b/config/RTP.config
@@ -12,7 +12,7 @@ endport=20000
 workerthreadcount=4
 
 # IPv4 address we should bind sessions to
-#ipv4bind=
+ipv4bind=127.0.0.1
 
 # IPv6 address we should bind sessions to
 #ipv6bind=
diff --git a/config/test_component.conf b/config/test_component.conf
index 8b7b63b..377d264 100644
--- a/config/test_component.conf
+++ b/config/test_component.conf
@@ -3,6 +3,7 @@
 #
 # Icebox Configuration
 #
+Ice.ThreadPool.Client.Size=4
 
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
@@ -15,6 +16,9 @@ Ice.Override.Timeout=5000
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_component_v6.conf b/config/test_component_v6.conf
index 6c991c6..8daa140 100644
--- a/config/test_component_v6.conf
+++ b/config/test_component_v6.conf
@@ -4,6 +4,8 @@
 # Icebox Configuration
 #
 
+Ice.ThreadPool.Client.Size=4
+
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,RTPStateReplicator,MediaServiceRTP,MediaRTPpjmediaTest
 
@@ -15,6 +17,9 @@ Ice.Override.Timeout=5000
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone=true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/config/test_rtp_ice.conf b/config/test_rtp_ice.conf
index 4edbeca..68fbee7 100644
--- a/config/test_rtp_ice.conf
+++ b/config/test_rtp_ice.conf
@@ -4,6 +4,7 @@
 # Icebox Configuration
 #
 RTPConfiguration.Name=rtpoice
+Ice.ThreadPool.Client.Size=4
 IceBox.InheritProperties=1
 IceBox.LoadOrder=ServiceDiscovery,MediaRTPpjmedia,MediaRTPpjmediaTest
 
@@ -24,10 +25,11 @@ LocatorService.Proxy=LocatorService:tcp -h 127.0.0.1 -p 4411
 
 IceBox.Service.MediaRTPpjmedia=MediaRTPPJMEDIA:create
 
+MediaRTPpjmedia.ServiceName=test
+
 # Adapter parameters for this component
-MediaRTPpjmediaAdapter.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLocal.Endpoints=default -h 127.0.0.1
-MediaRTPpjmediaAdapterLogger.Endpoints=default -h 127.0.0.1
+MediaRTPpjmedia.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 4471
+MediaRTPpjmedia.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 4472
 
 # A proxy to the service locator management service
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -h 127.0.0.1 -p 4422
@@ -47,6 +49,9 @@ IceBox.Service.MediaRTPpjmediaTest=MediaRTPPJMEDIAIceTest:create
 
 IceBox.Service.ServiceDiscovery=ServiceLocator:create
 
+# For unit test we run without state replication.
+ServiceDiscovery.Standalone = true
+
 ServiceDiscovery.IceStorm.InstanceName=ServiceDiscovery
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=tcp -h 127.0.0.1 -p 4421
diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
index 1bdd788..4d7099f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice
@@ -23,6 +23,7 @@
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.ice>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
 #include <AsteriskSCF/System/Component/ConfigurationIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
 
 module AsteriskSCF
 {
@@ -53,16 +54,16 @@ module V1
 
     interface RTPStateReplicatorListener
     {
-	void stateRemoved(Ice::StringSeq itemKeys);
-	void stateSet(RTPStateItemSeq items);
+	void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+	void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
     };
 
     interface RTPStateReplicator
     {
-	void addListener(RTPStateReplicatorListener *listener);
-	void removeListener(RTPStateReplicatorListener *listener);
-	void setState (RTPStateItemSeq items);
-	void removeState(Ice::StringSeq items);
+	void addListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+	void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, RTPStateReplicatorListener *listener);
+	void setState (AsteriskSCF::System::V1::OperationContext operationContext, RTPStateItemSeq items);
+	void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
 	idempotent RTPStateItemSeq getState(Ice::StringSeq itemKeys);
 	idempotent RTPStateItemSeq getAllState();
     };
diff --git a/src/Component.cpp b/src/Component.cpp
index 43d1166..e1956dc 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -22,40 +22,47 @@
 #include <IceUtil/UUID.h>
 
 #include <boost/shared_ptr.hpp>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
+#include <AsteriskSCF/Component/Component.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Logger/IceLogger.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
+#include "PJMEDIAEnvironment.h"
+#include "RTPConfiguration.h"
+#include "RTPConfigurationIf.h"
 #include "RTPReplicationContext.h"
 #include "RTPSession.h"
 #include "RTPStateReplicator.h"
-#include "RTPConfiguration.h"
-#include "RTPConfigurationIf.h"
-#include "PJMEDIAEnvironment.h"
 
-using namespace std;
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
 using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
-using namespace AsteriskSCF::PJMEDIARTP;
+using namespace std;
 
 namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+typedef ContextResultData<RTPSessionPrx> AllocateResultData;
+typedef boost::shared_ptr<AllocateResultData> AllocateResultDataPtr;
+
 }
 
 static const string ReplicaServiceId("MediaRTPReplica");
@@ -73,10 +80,11 @@ public:
       const ConfigurationServiceImplPtr&);
 
     RTPSessionPrx allocate(
-            const RTPServiceLocatorParamsPtr&,
-            const RTPOptionsPtr&,
-            RTPAllocationOutputsPtr&,
-            const Ice::Current&);
+        const AsteriskSCF::System::V1::OperationContextPtr&,
+        const RTPServiceLocatorParamsPtr&,
+        const RTPOptionsPtr&,
+        RTPAllocationOutputsPtr&,
+        const Ice::Current&);
 
     pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
 
@@ -86,6 +94,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     PJMEDIAEnvironmentPtr mEnvironment;
     RTPReplicationContextPtr mReplicationContext;
@@ -202,6 +211,7 @@ private:
     virtual void onPreInitialize();
     virtual void onStop();
     virtual void onStart();
+    virtual void onActivated();
 
     // Other base Component overrides
     virtual void prepareBackplaneServicesForDiscovery();
@@ -236,12 +246,12 @@ private:
 
 void Component::onSuspend()
 {
-    mGeneralState->serviceManagement->suspend();
+    mGeneralState->serviceManagement->suspend(AsteriskSCF::Operations::createContext());
 }
 
 void Component::onResume()
 {
-    mGeneralState->serviceManagement->unsuspend();
+    mGeneralState->serviceManagement->unsuspend(AsteriskSCF::Operations::createContext());
 }
 
 /**
@@ -250,6 +260,7 @@ void Component::onResume()
 RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
     const RTPReplicationContextPtr& replicationContext,
     const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mAdapter(adapter),
     mEnvironment(PJMEDIAEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
     mReplicationContext(replicationContext),
@@ -261,12 +272,24 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
  * Implementation of the allocate method as defined in MediaRTPIf.ice
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(
-        const RTPServiceLocatorParamsPtr& params,
-        const RTPOptionsPtr& options,
-        RTPAllocationOutputsPtr& outputs,
-        const Ice::Current&)
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const RTPServiceLocatorParamsPtr& params,
+    const RTPOptionsPtr& options,
+    RTPAllocationOutputsPtr& outputs,
+    const Ice::Current&)
 {
-    return AsteriskSCF::PJMEDIARTP::RTPSession::create(
+    std::pair<bool, AllocateResultDataPtr> data =
+        getContextSync<AllocateResultDataPtr>(mOperationContextCache, context);
+
+    if (data.first)
+    {
+        // retry detected
+        return data.second->getResult();
+    }
+
+    try
+    {
+        RTPSessionPrx r = AsteriskSCF::PJMEDIARTP::RTPSession::create(
             mAdapter,
             IceUtil::generateUUID(),
             params,
@@ -275,6 +298,19 @@ RTPSessionPrx RTPMediaServiceImpl::allocate(
             mConfigurationService,
             options,
             outputs);
+        data.second->setResult(r);
+        return r;
+    }
+    catch (const std::exception& e)
+    {
+        data.second->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data.second->setException();
+        throw;
+    }
 }
 
 void Component::onPreInitialize()
@@ -356,7 +392,7 @@ void Component::createPrimaryServices()
 
         if (rtpReplicationContext->isActive() == true)
         {
-            getServiceLocatorManagement()->addCompare(getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
+            getServiceLocatorManagement()->addCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator", mRTPMediaComparatorServicePrx);
         }
 
     }
@@ -413,7 +449,7 @@ void Component::findRemoteServices()
         // replicator service.
         ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
             rtpReplicationContext->getReplicator().initialize(), ReplicatorFacet);
-        configurationReplicator->registerConfigurationService(mConfigurationServicePrx);
+        configurationReplicator->registerConfigurationService(AsteriskSCF::Operations::createContext(), mConfigurationServicePrx);
 
     }
     catch (const std::exception& e)
@@ -466,7 +502,7 @@ void Component::listenToStateReplicators()
         // Are we in standby mode?
         if (rtpReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
         {
-            rtpReplicationContext->getReplicator()->addListener(mReplicatorListenerProxy);
+            rtpReplicationContext->getReplicator()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
             mListeningToReplicator = true;
         }
     }
@@ -494,7 +530,7 @@ void Component::stopListeningToStateReplicators()
 
     try
     {
-        rtpReplicationContext->getReplicator()->removeListener(mReplicatorListenerProxy);
+        rtpReplicationContext->getReplicator()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
         mListeningToReplicator = false;
     }
     catch (const Ice::Exception& e)
@@ -532,7 +568,19 @@ void Component::onRegisterPrimaryServices()
     }
 
     mGeneralState->serviceManagement = mRTPMediaServiceRegistration->getServiceManagement();
-    mGeneralState->serviceManagement->addLocatorParams(mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+    mGeneralState->serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(),
+        mRTPOverIceLocatorParams, getName() + ".RTP.Comparator");
+}
+
+void Component::onActivated()
+{
+    RTPReplicationContextPtr rtpReplicationContext =
+        boost::static_pointer_cast<RTPReplicationContext>(getReplicationContext());
+
+    RTPStateItemSeq items;
+    items.push_back(mGeneralState);
+    RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
+    oneway->setState(AsteriskSCF::Operations::createContext(), items);
 }
 
 void Component::onStart()
@@ -545,7 +593,7 @@ void Component::onStart()
         RTPStateItemSeq items;
         items.push_back(mGeneralState);
         RTPStateReplicatorPrx oneway = RTPStateReplicatorPrx::uncheckedCast(rtpReplicationContext->getReplicator()->ice_oneway());
-        oneway->setState(items);
+        oneway->setState(AsteriskSCF::Operations::createContext(), items);
     }
 }
 
@@ -553,10 +601,10 @@ void Component::onStop()
 {
     if (getReplicationContext()->isActive() == true)
     {
-       mGeneralState->serviceManagement->unregister();
+       mGeneralState->serviceManagement->unregister(AsteriskSCF::Operations::createContext());
     }
 
-    getServiceLocatorManagement()->removeCompare(getName() + ".RTP.Comparator");
+    getServiceLocatorManagement()->removeCompare(AsteriskSCF::Operations::createContext(), getName() + ".RTP.Comparator");
 }
 
 extern "C"
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index 994b1ad..b36c996 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -14,39 +14,38 @@
  * at the top of the source tree.
  */
 
-#include "ICETransport.h"
-#include "PJUtil.h"
-
-#include <pjmedia.h>
 #include <pjlib.h>
+#include <pjmedia.h>
 #include <pjnath.h>
 
-#include <AsteriskSCF/System/ExceptionsIf.h>
 #include <map>
+#include <sstream>
+
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
 #include <Ice/Ice.h>
-#include <sstream>
-#include <AsteriskSCF/Logger.h>
 #include <IceUtil/UUID.h>
 
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
-using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::PJUtil;
-using namespace std;
-using namespace AsteriskSCF::Helpers;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace std;
 
 namespace
 {
 Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-}
-
-namespace 
-{
 
 class ICEAgentImpl : public InteractiveConnectionAgent
 {
@@ -54,6 +53,7 @@ public:
 
     ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMEDIAEnvironmentPtr& env,
         const PJMEDIAEndpointPtr& ep) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
         mId(id),
         mShuttingDown(false),
@@ -84,10 +84,22 @@ public:
         return mRole;
     }
 
+    typedef AMDContextResultData<CandidatePtr, AMD_InteractiveConnectionAgent_negotiatePtr> NegotiateContextData;
+    typedef boost::shared_ptr<NegotiateContextData> NegotiateContextDataPtr;
+
     void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
-        const string& hostname, Ice::Int port, const CandidateSeq& candidates,
-        const Ice::Current&)
+            const AsteriskSCF::System::V1::OperationContextPtr& context,
+            const string& hostname, Ice::Int port, const CandidateSeq& candidates,
+            const Ice::Current&)
     {
+        NegotiateContextDataPtr data =
+            getContext<NegotiateContextData>(mOperationContextCache, context, callback);
+
+        if (!data)
+        {
+            // retry detected
+            return;
+        }
 
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         stateCheck();
@@ -99,12 +111,12 @@ public:
             // TODO: are we going to support cancellable negotiations.
             //
         }
-        mCurrentNegotiation = callback;
+        mCurrentNegotiation = data->getProxy();
 
         //
-        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal 
+        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
         //
-        pjmedia_sdp_session* remoteSDPSession = 
+        pjmedia_sdp_session* remoteSDPSession =
             static_cast<pjmedia_sdp_session*>(pj_pool_zalloc(mEnv->memoryPool(), sizeof(pjmedia_sdp_session)));
 
 
@@ -178,7 +190,7 @@ public:
 
         //
         // I was concerned about the fact that for a given SIP session, there might be multiple media
-        // streams and multiple candidates. I'm not sure that its actually too much of an issue even 
+        // streams and multiple candidates. I'm not sure that its actually too much of an issue even
         // if multiple media types are muxed on a single ICE negotiated flow, but there will need to be
         // some redesign to pull in the multiple media streams associated with the session. For the moment
         // we will operation under the premise that we are dealing with a single media stream.
@@ -191,7 +203,7 @@ public:
         {
             CandidatePtr candidate = *i;
             ostringstream os;
-            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " << 
+            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " <<
                 candidate->priority << ' ' << candidate->mappedAddress << ' ' << candidate->mappedPort << " typ ";
             string hostType;
             switch (candidate->type)
@@ -216,7 +228,7 @@ public:
             }
             string t = os.str();
             pj_str_t candidateStr = pj_str(const_cast<char*>(t.c_str()));
-            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(), 
+            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
                 "candidate", &candidateStr);
             pjmedia_sdp_attr_add(&currentMedia->attr_count, currentMedia->attr, newAttribute);
         }
@@ -425,7 +437,7 @@ public:
                             candidateObj->mappedAddress = candidateObj->baseAddress;
                             candidateObj->mappedPort = candidateObj->basePort;
                             candidateObj->baseAddress = baseAddress;
-                            candidateObj->basePort = basePort; 
+                            candidateObj->basePort = basePort;
                         }
                         else
                         {
@@ -493,6 +505,7 @@ public:
 
 private:
     boost::shared_mutex mLock;
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     Ice::Identity mId;
     bool mShuttingDown;
@@ -553,7 +566,7 @@ boost::shared_mutex ICECallbackAdapter::mLock;
 // invoked before we get a chance to add the transport.  The solution to that is to allow an entry to be created when
 // the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
 // simply update the appropriate field.
-// 
+//
 void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
 {
     bool alreadyKnown = false;
@@ -626,7 +639,7 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
 {
     //
     // AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
-    // 
+    //
     switch (operation)
     {
         case PJ_ICE_STRANS_OP_INIT:
@@ -684,18 +697,18 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
         case PJ_ICE_STRANS_OP_KEEP_ALIVE:
             //
             // Keep alive has successfully completed. FWICT this should not get here.
-            // 
+            //
             break;
-    };
+    }
 }
 
-}
+} // anonymous namespace
 
 ICETransport::~ICETransport()
 {
     //
     // TODO : cleanup ICE transport, the transport itself is closed by the parent class.
-    // 
+    //
     ICECallbackAdapter::removeEntry(mTransport);
 }
 
@@ -728,7 +741,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
         {
             //
             // Address has changed! We need to let Session listeners know!
-            // TODO! 
+            // TODO!
             //
             pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
         }
@@ -738,21 +751,33 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
     mMonitor.notify_one();
 }
 
-AddressPtr ICETransport::localAddress() 
+AddressPtr ICETransport::localAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     if (mLocalAddress)
     {
         return mLocalAddress;
     }
+    //
+    // Retry check for local address for max of 2.5 seconds, then proceed as if it was unknown.
+    //
     for (size_t i = 0; i < 5 && !mLocalAddress; ++i)
     {
-        mMonitor.wait(lock);
+        try
+        {
+            mMonitor.timed_wait(lock, boost::posix_time::milliseconds(500));
+        }
+        catch (const boost::thread_interrupted&)
+        {
+        }
+        catch (const boost::thread_resource_error&)
+        {
+        }
     }
     return mLocalAddress;
 }
 
-AddressPtr ICETransport::remoteAddress() 
+AddressPtr ICETransport::remoteAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     return mRemoteAddress;
@@ -791,7 +816,7 @@ void ICETransport::start()
     PJICECallbackPtr callback(new pjmedia_ice_cb);
     callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
     NATModulePtr natModule = NATModule::create(mConfig, mEndpoint);
-    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1), 
+    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
         natModule->configuration(), callback.get(), &t);
     if (fail(result))
     {
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index 3c5650a..178bf22 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,19 +14,22 @@
  * at the top of the source tree.
  */
 
-#include "RTPConfigurationIf.h"
-#include "RTPConfiguration.h"
 
 #include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-using namespace AsteriskSCF::System::Configuration::V1;
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
+#include "RTPConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace std;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 
 /**
  * Implementation of the configuration service.
@@ -34,24 +37,25 @@ using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
 {
 public:
+    ConfigurationServiceServant() : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) {}
 
     /**
      * AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
      */
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationGroups(const Ice::Current&);
 
-    void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    void setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 
-    void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+    void removeConfigurationItems(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
             const Ice::Current&);
-    void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+    void removeConfigurationGroups(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
             const Ice::Current&);
 
     /**
@@ -90,12 +94,12 @@ public:
     {
         mICEConfig = newConfig;
     }
-    
+
     void replaceConfig(const NATConfigPtr& newConfig)
     {
         mNATConfig = newConfig;
     }
-    
+
     void replaceConfig(const SRTPConfigurationPtr& newConfig)
     {
         mSRTPConfig = newConfig;
@@ -121,7 +125,7 @@ public:
         //
         if (mGeneralGroup)
         {
-            ConfigurationItemDict::const_iterator item = 
+            ConfigurationItemDict::const_iterator item =
                 mGeneralGroup->configurationItems.find(EnableSRTPItemName);
             if (item != mGeneralGroup->configurationItems.end())
             {
@@ -150,6 +154,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     /**
      * General RTP configuration
      */
@@ -223,7 +228,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
         void visitRTPICEConfigurationGroup(const RTPICEConfigurationGroupPtr& group)
         {
             RTPICEConfigurationGroupPtr currentGroup = mImpl->getICEConfigurationGroup();
-            
+
             if (!currentGroup)
             {
                 return;
@@ -233,19 +238,19 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
                     returnedGroup->configurationItems);
             mGroups.push_back(returnedGroup);
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -259,7 +264,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
             mImpl(impl), mGroups(visitorGroups)
         {
         }
- 
+
     private:
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
         {
@@ -278,21 +283,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
                 mGroups.push_back(g);
             }
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
 
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -300,7 +305,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
 {
     ConfigurationGroupSeq groups;
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-   
+
     if (mGeneralGroup)
     {
         groups.push_back(new RTPGeneralGroup);
@@ -309,21 +314,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
     {
         groups.push_back(new RTPICEConfigurationGroup);
     }
-    
+
     return groups;
 }
 
-void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr& context, const ConfigurationGroupSeq& groups,
     const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
- 
+
     private:
         /**
          * Helper function which performs serial number checking of items
@@ -340,22 +345,22 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                 {
                     continue;
                 }
-  
+
                 ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
-  
+
                 if (localItem == localItems.end())
                 {
                     // This is a new item so serial checking does not apply
                     continue;
                 }
-  
+
                 if (item->second->serialNumber < localItem->second->serialNumber)
                 {
                     throw SerialConflict(group, item->second);
                 }
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
         {
             RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -367,7 +372,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
             {
                 performSerialCheck(group->configurationItems, g->configurationItems, group);
             }
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -415,7 +420,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                         mImpl->replaceConfig(ICEConfiguration::create(mMaxCandidates, mMaxCalls));
                     }
                 }
-                
+
 
                 void visitSTUNServerItem(const STUNServerItemPtr& item)
                 {
@@ -454,11 +459,11 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
                         mMaxCandidates = item->maxCandidates;
                     }
                 }
-                
+
             private:
 
                 ConfigurationServiceServantPtr mImpl;
-                
+
                 bool mCreateRTPICEConfig;
                 bool mCreateNATConfig;
 
@@ -481,7 +486,7 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
             }
 
             RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -495,7 +500,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
 
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
 
     boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -506,12 +517,13 @@ void ConfigurationServiceServant::setConfiguration(const ConfigurationGroupSeq&
 }
 
 void ConfigurationServiceServant::removeConfigurationItems(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
@@ -535,7 +547,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                 localItems.erase(localItem);
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
         {
             RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -550,7 +562,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
             //
             // The Visitor in this case interprets the provided information
             // to decide what it needs to do. This type of thing might be better moved
-            // into a separate helper class (along with the set operations), but 
+            // into a separate helper class (along with the set operations), but
             // we'll leave it here for now to avoid inconsistencies with the
             // rest of the configuration implementation.
             //
@@ -565,7 +577,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                     mResetICEConfig(false)
                 {
                 }
-                
+
                 ~Visitor()
                 {
                     if (mDisable)
@@ -601,7 +613,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                         mImpl->replaceConfig(ICEConfigurationPtr());
                     }
                 }
-                
+
                 void visitSTUNServerItem(const STUNServerItemPtr& item)
                 {
                     if (item)
@@ -633,7 +645,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                         mResetICEConfig = true;
                     }
                 }
-                
+
             private:
                 ConfigurationServiceServantPtr mImpl;
                 bool mRemoveSTUNServer;
@@ -645,37 +657,45 @@ void ConfigurationServiceServant::removeConfigurationItems(
             };
 
             RTPICEConfigurationGroupPtr g = mImpl->getICEConfigurationGroup();
-            
+
             if (g)
             {
                 RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
                 removeItems(v, group->configurationItems, g->configurationItems);
             }
         }
- 
+
     private:
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
 }
 
-void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationGroupSeq& groups, const Ice::Current&)
+void ConfigurationServiceServant::removeConfigurationGroups(
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const ConfigurationGroupSeq& groups, const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
- 
+
     private:
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
         {
@@ -689,11 +709,17 @@ void ConfigurationServiceServant::removeConfigurationGroups(const ConfigurationG
 
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
 
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 4cccbb1..d3fe04c 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -32,21 +32,24 @@
 #include <IceUtil/UUID.h>
 
 #include <AsteriskSCF/Media/MediaIf.h>
-#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media;
-using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace std;
 
 /**
  * RTCP Information Interface implementation.
@@ -54,7 +57,8 @@ using namespace AsteriskSCF::SessionCommunications::V1;
 class RTCPInformationImpl : public RTCP::V1::Information
 {
 public:
-    RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+    RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream, const OperationContextCachePtr& cache) :
+        mOperationContextCache(cache),
         mGeneralStatistics(general), mStreamStatistics(stream) { }
 
     RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
@@ -88,14 +92,24 @@ public:
         return statistics;
     }
 
-    void addListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    void addListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
 	boost::unique_lock<boost::shared_mutex> lock(mLock);
         mListeners.push_back(listener);
     }
 
-    void removeListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    void removeListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
 	boost::unique_lock<boost::shared_mutex> lock(mLock);
         mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
     }
@@ -115,6 +129,8 @@ private:
      */
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Listeners present.
      */
@@ -142,22 +158,22 @@ typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
 class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
 {
 public:
-    RTPSessionImpl(const Ice::ObjectAdapterPtr&, 
-            const string& id,
-            const PJMEDIAEnvironmentPtr& env,
-            const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
-	        const RTPReplicationContextPtr& replicationContext,
-	        const ConfigurationServiceImplPtr&);
-    
+    RTPSessionImpl(const Ice::ObjectAdapterPtr&,
+        const string& id,
+        const PJMEDIAEnvironmentPtr& env,
+        const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+        const RTPReplicationContextPtr& replicationContext,
+        const ConfigurationServiceImplPtr&);
+
     RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
-            const string& sessionIdentity,    
-            const PJMEDIAEnvironmentPtr& env,
-            Ice::Int port,
-            const AsteriskSCF::Media::V1::FormatSeq& formats,
-            bool isIPv6,
-            bool srtp,
-	        const RTPReplicationContextPtr& replicationContext,
-            const ConfigurationServiceImplPtr& configurationServant);
+        const string& sessionIdentity,
+        const PJMEDIAEnvironmentPtr& env,
+        Ice::Int port,
+        const AsteriskSCF::Media::V1::FormatSeq& formats,
+        bool isIPv6,
+        bool srtp,
+        const RTPReplicationContextPtr& replicationContext,
+        const ConfigurationServiceImplPtr& configurationServant);
 
     ~RTPSessionImpl();
 
@@ -168,26 +184,29 @@ public:
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
     void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
-    void setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
-                    const Ice::Current&);
-    void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb, 
-                          const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet, 
+    void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::Media::V1::SessionCookies& cookies,
+            const Ice::Current&);
+    void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
+                          const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
                           const Ice::Current&);
-    void removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
-                const Ice::Current&);
+    void removeCookies(
+            const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::Media::V1::SessionCookies& cookies,
+            const Ice::Current&);
 
     void useRTCP(bool, const Ice::Current&);
     void release(const Ice::Current&);
-    void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
+    void associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
 
     /**
      * AsteriskSCF::Media::V1::SRTPSession implementation.
      */
-    void setOptions(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
-    void start(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+    void setOptions(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+    void start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
 
     /**
-     * Internal methods. 
+     * Internal methods.
      */
     AsteriskSCF::Media::V1::FormatSeq getFormats();
     void setRemoteDetails(const std::string& address, Ice::Int port);
@@ -195,7 +214,7 @@ public:
     int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
 
     /**
-     * Accessors for the source and sink servants. 
+     * Accessors for the source and sink servants.
      */
     StreamSourceRTPImplPtr getSourceServant();
     StreamSinkRTPImplPtr getSinkServant();
@@ -242,6 +261,8 @@ private:
 
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
      */
@@ -308,7 +329,7 @@ private:
      */
     RTPSessionStateItemPtr mSessionStateItem;
 
-    // Context for state replication for this component. 
+    // Context for state replication for this component.
     RTPReplicationContextPtr mReplicationContext;
 
     /**
@@ -333,6 +354,8 @@ private:
 
     TelephonyEventSourcePrx mTelephonyEventSourcePrx;
     TelephonyEventSinkPrx mTelephonyEventSinkPrx;
+
+    AsteriskSCF::Media::V1::SessionCookieDict mCookies;
 };
 
 /**
@@ -349,7 +372,7 @@ public:
     /**
      * Constructor for this implementation.
      */
-    RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+    RTCPSessionImpl(const RTPSessionImplPtr& session, const OperationContextCachePtr& cache) : mOperationContetCache(cache), mSession(session) { }
 
     /**
      * Method used to retrieve the port our RTCP session is listening on.
@@ -364,12 +387,35 @@ public:
         return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
     }
 
-    void setRemoteDetails(const std::string& address, Ice::Int port, const Ice::Current&)
+    void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context, const std::string& address, Ice::Int port, const Ice::Current&)
     {
-        mSession->setRemoteRtcpDetails(address, port);
+        ContextDataPtr data = checkAndThrow(mOperationContetCache, context);
+        if (!data)
+        {
+            // retry detected
+            return;
+        }
+
+        try
+        {
+            mSession->setRemoteRtcpDetails(address, port);
+            data->setCompleted();
+        }
+        catch (const std::exception& e)
+        {
+            data->setException(e);
+            throw;
+        }
+        catch (...)
+        {
+            data->setException();
+            throw;
+        }
     }
 
 private:
+    OperationContextCachePtr mOperationContetCache;
+
     /**
      * Pointer to the RTP session.
      */
@@ -389,7 +435,7 @@ public:
     {
     }
 
-    void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem) 
+    void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
     {
         mServant->replicateState(0, sinkStateItem, 0, 0, 0);
     }
@@ -409,12 +455,12 @@ public:
         mServant->replicateState(0, 0, 0, 0, item);
     }
 
-    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload) 
+    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
     {
         return mServant->getFormat(payload);
     }
 
-    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) 
+    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
     {
         return mServant->getPayload(format);
     }
@@ -466,7 +512,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
         const PJMEDIAEnvironmentPtr& env,
         const RTPServiceLocatorParamsPtr& params,
 	    const RTPReplicationContextPtr& replicationContext,
-        const ConfigurationServiceImplPtr& configurationService) : 
+        const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mEnvironment(env),
     mEndpoint(PJMEDIAEndpoint::create(env)),
     mId(id),
@@ -594,42 +641,71 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
     return mId;
 }
 
-/** 
- * Local implementation. 
+/**
+ * Local implementation.
  */
 void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookieMap)
 {
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    mSessionStateItem->cookies = cookieMap;
+    mCookies = cookieMap;
+
+    if (mSessionStateItem)
+    {
+        mSessionStateItem->cookies = cookieMap;
+    }
 }
 
-/** 
- * Support for the corresponding API call. 
+/**
+ * Support for the corresponding API call.
  */
-void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
-                const Ice::Current&)
+void RTPSessionImpl::setCookies(
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::SessionCookies& cookies,
+        const Ice::Current&)
 {
-    { // scope the lock
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
-             i != cookies.end();  ++i)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        { // scope the lock
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+                 i != cookies.end();  ++i)
+            {
+                mCookies[(*i)->ice_id()] = (*i);
+            }
+        }
+
+        if (mReplicationContext->isReplicating() == true)
         {
-            mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+            mSessionStateItem->cookies = mCookies;
+            replicateState(mSessionStateItem, 0, 0, 0, 0);
         }
+        data->setCompleted();
     }
-
-    if (mReplicationContext->isReplicating() == true)
+    catch (const std::exception& e)
     {
-        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
 }
 
-/** 
- * Implementation of the corresponding API call. 
+/**
+ * Implementation of the corresponding API call.
  */
 void RTPSessionImpl::getCookies_async(
         const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
-        const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet, 
+        const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
         const Ice::Current&)
 {
     AsteriskSCF::Media::V1::SessionCookies results;
@@ -639,9 +715,9 @@ void RTPSessionImpl::getCookies_async(
 	    i != cookiesToGet.end();
 	    ++i)
     {
-	AsteriskSCF::Media::V1::SessionCookieDict::const_iterator cookie = mSessionStateItem->cookies.find((*i)->ice_id());
+	AsteriskSCF::Media::V1::SessionCookieDict::const_iterator cookie = mCookies.find((*i)->ice_id());
 
-	if (cookie == mSessionStateItem->cookies.end())
+	if (cookie == mCookies.end())
 	{
 	    continue;
 	}
@@ -652,24 +728,48 @@ void RTPSessionImpl::getCookies_async(
     cb->ice_response(results);
 }
 
-/** 
- * Implementation of the corresponding API call. 
+/**
+ * Implementation of the corresponding API call.
  */
-void RTPSessionImpl::removeCookies(const AsteriskSCF::Media::V1::SessionCookies& cookies, 
-                const Ice::Current&)
+void RTPSessionImpl::removeCookies(
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::SessionCookies& cookies,
+        const Ice::Current&)
 {
-    { // scope the lock
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
-                i != cookies.end(); ++i)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        { // scope the lock
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+                 i != cookies.end(); ++i)
+            {
+                mCookies.erase((*i)->ice_id());
+            }
+        }
+
+        if (mReplicationContext->isReplicating() == true)
         {
-            mSessionStateItem->cookies.erase((*i)->ice_id());
+            mSessionStateItem->cookies = mCookies;
+            replicateState(mSessionStateItem, 0, 0, 0, 0);
         }
+        data->setCompleted();
     }
-
-    if (mReplicationContext->isReplicating() == true)
+    catch (const std::exception& e)
     {
-        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
 }
 
@@ -709,34 +809,97 @@ void RTPSessionImpl::release(const Ice::Current&)
 /**
  * Implementation of the associatePayloads method as defined in MediaRTPIf.ice
  */
-void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
+void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
 {
-    mSessionStateItem->payloadstoFormats = mappings;
-    associatePayloadsImpl(mappings);
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
 
-    // Only the session has changed so push a single update out for it
-    replicateState(mSessionStateItem, 0, 0, 0, 0);
+    try
+    {
+        mSessionStateItem->payloadstoFormats = mappings;
+        associatePayloadsImpl(mappings);
+
+        // Only the session has changed so push a single update out for it
+        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
+    }
 }
 
 
-void RTPSessionImpl::setOptions(const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
 {
-    SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
-    if (!srtpTransport)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
     {
-        throw SRTPUnavailable();
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+        if (!srtpTransport)
+        {
+            throw SRTPUnavailable();
+        }
+        srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
-    srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
 }
 
-void RTPSessionImpl::start(const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
 {
-    SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
-    if (!srtpTransport)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
     {
-        throw SRTPUnavailable();
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+        if (!srtpTransport)
+        {
+            throw SRTPUnavailable();
+        }
+        srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
-    srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
 }
 
 /**
@@ -927,7 +1090,7 @@ void RTPSessionImpl::replicateState(
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->setState(items);
+    mReplicationContext->getReplicator().tryOneWay()->setState(createContext(), items);
 }
 
 /**
@@ -964,7 +1127,7 @@ void RTPSessionImpl::removeState(const RTPSessionStateItemPtr& session, const RT
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->removeState(items);
+    mReplicationContext->getReplicator().tryOneWay()->removeState(createContext(), items);
 }
 
 void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
@@ -1033,9 +1196,9 @@ RTPSessionPrx RTPSessionImpl::activate(
             outputs->eventSinks.push_back(mTelephonyEventSinkPrx);
         }
 
-        mRtcpSessionInterface = new RTCPSessionImpl(this);
-        mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
-        mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
+        mRtcpSessionInterface = new RTCPSessionImpl(this, mOperationContextCache);
+        mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx, mOperationContextCache);
+        mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx, mOperationContextCache);
         mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
         mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
         mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
@@ -1146,10 +1309,13 @@ private:
     RTPSessionImplPtr mImpl;
 };
 
+/**
+ * Factory method used by active component.
+ */
 RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
         const std::string& id, const RTPServiceLocatorParamsPtr& params,
-        const PJMEDIAEnvironmentPtr& environment, 
-        const RTPReplicationContextPtr& replicationContext, 
+        const PJMEDIAEnvironmentPtr& environment,
+        const RTPReplicationContextPtr& replicationContext,
         const ConfigurationServiceImplPtr& configuration,
         const RTPOptionsPtr& options,
         RTPAllocationOutputsPtr& outputs)
@@ -1159,19 +1325,22 @@ RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapt
     return servant->activate(id, options, outputs);
 }
 
+/**
+ * Factory method used by state replicator. 
+ */
 ReplicationAdapterPtr AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
-        const PJMEDIAEnvironmentPtr& environment, 
+        const PJMEDIAEnvironmentPtr& environment,
         const RTPSessionStateItemPtr& item,
-        const RTPReplicationContextPtr& replicationContext, 
+        const RTPReplicationContextPtr& replicationContext,
         const ConfigurationServiceImplPtr& configuration,
         const RTPOptionsPtr& options,
         RTPAllocationOutputsPtr& outputs)
 {
-    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, 
-                    adapter->getCommunicator()->identityToString(item->sessionIdentity), 
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
+                    adapter->getCommunicator()->identityToString(item->sessionIdentity),
                     environment,
                     item->port, item->formats, item->ipv6, item->srtp,
-                    replicationContext, 
+                    replicationContext,
                     configuration));
 
     servant->setCookies(item->cookies);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 9ea743c..72c3f5f 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -26,24 +26,28 @@
 #include "RTPStateReplicationIf.h"
 #include "RTPTelephonyEventSink.h"
 
+#include <boost/asio/detail/socket_ops.hpp>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
-#include <boost/asio/detail/socket_ops.hpp>
 
 #include <pjlib.h>
 #include <pjmedia.h>
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace std;
 
 /**
  * Private implementation details for the StreamSinkRTPImpl class.
@@ -55,10 +59,14 @@ public:
      * Constructor for our StreamSinkRTPImplPriv class.
      */
     StreamSinkRTPImplPriv(
-            const SessionAdapterPtr& sessionAdapter, 
+            const SessionAdapterPtr& sessionAdapter,
             const PJMEDIATransportPtr& transport,
             const std::string&);
 
+    boost::shared_mutex mMutex;
+
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * A structure containing outgoing pjmedia session data.
      */
@@ -84,18 +92,19 @@ public:
     /**
      * The parent session's id.
      */
-    string mSessionId;
+    const string mSessionId;
 };
 
 /**
  * Constructor for the StreamSinkRTPImplPriv class.
  */
 StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(
-        const SessionAdapterPtr& session, 
+        const SessionAdapterPtr& session,
         const PJMEDIATransportPtr& transport,
         const string& sessionId) :
-    mSessionAdapter(session), mTransport(transport), 
-    mSinkStateItem(new RTPStreamSinkStateItem), 
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+    mSessionAdapter(session), mTransport(transport),
+    mSinkStateItem(new RTPStreamSinkStateItem),
     mSessionId(sessionId)
 {
     pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
@@ -117,6 +126,7 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
 
 TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mTelephonyEventSink =
         new RTPTelephonyEventSink(
             &mImpl->mOutgoingSession,
@@ -129,6 +139,7 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
 
 RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mTelephonyEventSink;
 }
 
@@ -137,6 +148,7 @@ RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
  */
 void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
 {
+//    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     // Don't even bother if no remote address information is present
     if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
     {
@@ -212,8 +224,17 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
 /**
  * Implementation of the setSource method as defined in MediaIf.ice
  */
-void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
+void StreamSinkRTPImpl::setSource(
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        std::clog << "!!! RETRY !!!\n";
+        // retry detected
+        return;
+    }
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = source;
 
     mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -224,6 +245,7 @@ void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&
  */
 AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem->source;
 }
 
@@ -232,6 +254,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSessionAdapter->getFormats();
 }
 
@@ -240,6 +263,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
  */
 std::string StreamSinkRTPImpl::getId(const Ice::Current&)
 {
+    // Immutable; no lock needed
     /* For now utilize the id of the session */
     return mImpl->mSessionId;
 }
@@ -247,20 +271,45 @@ std::string StreamSinkRTPImpl::getId(const Ice::Current&)
 /**
  * Implementation of the setRemoteDetails method as defined in MediaRTPIf.ice
  */
-void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, const Ice::Current&)
+void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context,
+    const string& address, Ice::Int port, const Ice::Current&)
 {
-    /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
-     * actually attaching the transport.
-     */
-    mImpl->mSessionAdapter->setRemoteDetails(address, port);
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    /* We do store it though in case we have not yet received a packet from the remote side but
-     * are asked for the remote address. It is also stored for replication purposes.
-     */
-    mImpl->mSinkStateItem->remoteAddress = address;
-    mImpl->mSinkStateItem->remotePort = port;
+    if (!data)
+    {
+        std::clog << "!!! RETRY !!!\n";
+        // retry detected
+        return;
+    }
 
-    mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+    try
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+        /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
+         * actually attaching the transport.
+         */
+        mImpl->mSessionAdapter->setRemoteDetails(address, port);
+
+        /* We do store it though in case we have not yet received a packet from the remote side but
+         * are asked for the remote address. It is also stored for replication purposes.
+         */
+        mImpl->mSinkStateItem->remoteAddress = address;
+        mImpl->mSinkStateItem->remotePort = port;
+
+        mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
+    }
 }
 
 /**
@@ -268,6 +317,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
  */
 std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         string address = mImpl->mTransport->remoteAddress()->hostname();
@@ -281,6 +331,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
  */
 Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         int port = mImpl->mTransport->remoteAddress()->port();
@@ -294,11 +345,13 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
  */
 RTPStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem;
 }
 
 RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTelephonyEventSink)
     {
         return mImpl->mTelephonyEventSink->getStateItem();
@@ -308,17 +361,20 @@ RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateI
 
 void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->remoteAddress = host;
     mImpl->mSinkStateItem->remotePort = port;
 }
 
 void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = proxy;
 }
 
 Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEncode, const AudioFormatPtr& audioFormat)
 {
+    // mImpl->mMutex should already be locked
     if (audioFormat->sampleSize == 8)
     {
         ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
@@ -351,6 +407,7 @@ Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEnco
 
 Ice::ByteSeq StreamSinkRTPImpl::encodeVideoPayload(const FramePayloadPtr& toEncode, const VideoFormatPtr&)
 {
+    // no shared state, no need to lock
... 1175 lines suppressed ...


-- 
asterisk-scf/release/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list