[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Apr 9 16:27:15 CDT 2012


branch "retry_deux" has been updated
       via  48cccf1cb79f3a76c2dc572996486f7e667ad664 (commit)
      from  cb5c5931ba2a559c17ba6fb47356af22524c9a0e (commit)

Summary of changes:
 src/Component.cpp                  |   68 ++++++--
 src/ICETransport.cpp               |   80 +++++----
 src/RTPConfiguration.cpp           |  129 ++++++++------
 src/RTPSession.cpp                 |  342 +++++++++++++++++++++++++----------
 src/RTPSink.cpp                    |   94 ++++++++--
 src/RTPSource.cpp                  |   28 ++--
 src/RTPStateReplicatorListener.cpp |   47 +++--
 src/RTPTelephonyEventSink.cpp      |   40 ++++-
 src/RTPTelephonyEventSink.h        |    5 +-
 src/RTPTelephonyEventSource.cpp    |    2 +
 test/TestRTPpjmedia.cpp            |   90 +++++++++-
 11 files changed, 658 insertions(+), 267 deletions(-)


- Log -----------------------------------------------------------------
commit 48cccf1cb79f3a76c2dc572996486f7e667ad664
Author: David M. Lee <dlee at digium.com>
Date:   Fri Apr 6 15:46:43 2012 -0500

    Retry logic, thready safety, formatting.

diff --git a/src/Component.cpp b/src/Component.cpp
index 673af44..55bb668 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -24,39 +24,45 @@
 #include <boost/shared_ptr.hpp>
 #include <AsteriskSCF/Operations/OperationContext.h>
 
+#include <AsteriskSCF/Component/Component.h>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Logger/IceLogger.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-#include <AsteriskSCF/Logger/IceLogger.h>
-#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Component/Component.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
+#include "PJMEDIAEnvironment.h"
+#include "RTPConfiguration.h"
+#include "RTPConfigurationIf.h"
 #include "RTPReplicationContext.h"
 #include "RTPSession.h"
 #include "RTPStateReplicator.h"
-#include "RTPConfiguration.h"
-#include "RTPConfigurationIf.h"
-#include "PJMEDIAEnvironment.h"
 
-using namespace std;
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
+using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Replication;
 using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::System::Logging;
-using namespace AsteriskSCF::Discovery;
-using namespace AsteriskSCF::Replication;
-using namespace AsteriskSCF::PJMEDIARTP;
+using namespace std;
 
 namespace
 {
 Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+typedef ContextResultData<RTPSessionPrx> AllocateResultData;
+typedef boost::shared_ptr<AllocateResultData> AllocateResultDataPtr;
+
 }
 
 static const string ReplicaServiceId("MediaRTPReplica");
@@ -88,6 +94,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     PJMEDIAEnvironmentPtr mEnvironment;
     RTPReplicationContextPtr mReplicationContext;
@@ -252,6 +259,7 @@ void Component::onResume()
 RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
     const RTPReplicationContextPtr& replicationContext,
     const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mAdapter(adapter),
     mEnvironment(PJMEDIAEnvironment::create(adapter->getCommunicator()->getProperties(), configurationService)),
     mReplicationContext(replicationContext),
@@ -263,13 +271,24 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter,
  * Implementation of the allocate method as defined in MediaRTPIf.ice
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(
-    const AsteriskSCF::System::V1::OperationContextPtr&,
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const RTPServiceLocatorParamsPtr& params,
     const RTPOptionsPtr& options,
     RTPAllocationOutputsPtr& outputs,
     const Ice::Current&)
 {
-    return AsteriskSCF::PJMEDIARTP::RTPSession::create(
+    std::pair<bool, AllocateResultDataPtr> data =
+        getContextSync<AllocateResultDataPtr>(mOperationContextCache, context);
+
+    if (data.first)
+    {
+        // retry detected
+        return data.second->getResult();
+    }
+
+    try
+    {
+        RTPSessionPrx r = AsteriskSCF::PJMEDIARTP::RTPSession::create(
             mAdapter,
             IceUtil::generateUUID(),
             params,
@@ -278,6 +297,19 @@ RTPSessionPrx RTPMediaServiceImpl::allocate(
             mConfigurationService,
             options,
             outputs);
+        data.second->setResult(r);
+        return r;
+    }
+    catch (const std::exception& e)
+    {
+        data.second->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data.second->setException();
+        throw;
+    }
 }
 
 void Component::onPreInitialize()
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index 9553281..468bb30 100755
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -14,39 +14,38 @@
  * at the top of the source tree.
  */
 
-#include "ICETransport.h"
-#include "PJUtil.h"
-
-#include <pjmedia.h>
 #include <pjlib.h>
+#include <pjmedia.h>
 #include <pjnath.h>
 
-#include <AsteriskSCF/System/ExceptionsIf.h>
 #include <map>
+#include <sstream>
+
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
 #include <Ice/Ice.h>
-#include <sstream>
-#include <AsteriskSCF/Logger.h>
 #include <IceUtil/UUID.h>
 
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <AsteriskSCF/System/NAT/NATTraversalIf.h>
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+
+using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
-using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::PJUtil;
-using namespace std;
-using namespace AsteriskSCF::Helpers;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::NAT::V1;
+using namespace AsteriskSCF::System::V1;
+using namespace std;
 
 namespace
 {
 Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-}
-
-namespace 
-{
 
 class ICEAgentImpl : public InteractiveConnectionAgent
 {
@@ -54,6 +53,7 @@ public:
 
     ICEAgentImpl(const Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id, const PJMEDIAEnvironmentPtr& env,
         const PJMEDIAEndpointPtr& ep) :
+        mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
         mAdapter(adapter),
         mId(id),
         mShuttingDown(false),
@@ -84,11 +84,22 @@ public:
         return mRole;
     }
 
+    typedef AMDContextResultData<CandidatePtr, AMD_InteractiveConnectionAgent_negotiatePtr> NegotiateContextData;
+    typedef boost::shared_ptr<NegotiateContextData> NegotiateContextDataPtr;
+
     void negotiate_async(const AMD_InteractiveConnectionAgent_negotiatePtr& callback,
-            const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::System::V1::OperationContextPtr& context,
             const string& hostname, Ice::Int port, const CandidateSeq& candidates,
             const Ice::Current&)
     {
+        NegotiateContextDataPtr data =
+            getContext<NegotiateContextDataPtr>(mOperationContextCache, context, callback);
+
+        if (!data)
+        {
+            // retry detected
+            return;
+        }
 
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         stateCheck();
@@ -100,12 +111,12 @@ public:
             // TODO: are we going to support cancellable negotiations.
             //
         }
-        mCurrentNegotiation = callback;
+        mCurrentNegotiation = data->getProxy();
 
         //
-        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal 
+        // So how this works is we create a remote SDP and call pjmedia_transport_start() easy peasy. (Same deal
         //
-        pjmedia_sdp_session* remoteSDPSession = 
+        pjmedia_sdp_session* remoteSDPSession =
             static_cast<pjmedia_sdp_session*>(pj_pool_zalloc(mEnv->memoryPool(), sizeof(pjmedia_sdp_session)));
 
 
@@ -179,7 +190,7 @@ public:
 
         //
         // I was concerned about the fact that for a given SIP session, there might be multiple media
-        // streams and multiple candidates. I'm not sure that its actually too much of an issue even 
+        // streams and multiple candidates. I'm not sure that its actually too much of an issue even
         // if multiple media types are muxed on a single ICE negotiated flow, but there will need to be
         // some redesign to pull in the multiple media streams associated with the session. For the moment
         // we will operation under the premise that we are dealing with a single media stream.
@@ -192,7 +203,7 @@ public:
         {
             CandidatePtr candidate = *i;
             ostringstream os;
-            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " << 
+            os << "candidate:" << candidate->foundation << ' ' << candidate->componentId <<  " UDP " <<
                 candidate->priority << ' ' << candidate->mappedAddress << ' ' << candidate->mappedPort << " typ ";
             string hostType;
             switch (candidate->type)
@@ -217,7 +228,7 @@ public:
             }
             string t = os.str();
             pj_str_t candidateStr = pj_str(const_cast<char*>(t.c_str()));
-            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(), 
+            pjmedia_sdp_attr* newAttribute = pjmedia_sdp_attr_create(mEnv->memoryPool(),
                 "candidate", &candidateStr);
             pjmedia_sdp_attr_add(&currentMedia->attr_count, currentMedia->attr, newAttribute);
         }
@@ -426,7 +437,7 @@ public:
                             candidateObj->mappedAddress = candidateObj->baseAddress;
                             candidateObj->mappedPort = candidateObj->basePort;
                             candidateObj->baseAddress = baseAddress;
-                            candidateObj->basePort = basePort; 
+                            candidateObj->basePort = basePort;
                         }
                         else
                         {
@@ -494,6 +505,7 @@ public:
 
 private:
     boost::shared_mutex mLock;
+    OperationContextCachePtr mOperationContextCache;
     Ice::ObjectAdapterPtr mAdapter;
     Ice::Identity mId;
     bool mShuttingDown;
@@ -554,7 +566,7 @@ boost::shared_mutex ICECallbackAdapter::mLock;
 // invoked before we get a chance to add the transport.  The solution to that is to allow an entry to be created when
 // the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
 // simply update the appropriate field.
-// 
+//
 void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
 {
     bool alreadyKnown = false;
@@ -627,7 +639,7 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
 {
     //
     // AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
-    // 
+    //
     switch (operation)
     {
         case PJ_ICE_STRANS_OP_INIT:
@@ -685,18 +697,18 @@ void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_stra
         case PJ_ICE_STRANS_OP_KEEP_ALIVE:
             //
             // Keep alive has successfully completed. FWICT this should not get here.
-            // 
+            //
             break;
-    };
+    }
 }
 
-}
+} // anonymous namespace
 
 ICETransport::~ICETransport()
 {
     //
     // TODO : cleanup ICE transport, the transport itself is closed by the parent class.
-    // 
+    //
     ICECallbackAdapter::removeEntry(mTransport);
 }
 
@@ -729,7 +741,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
         {
             //
             // Address has changed! We need to let Session listeners know!
-            // TODO! 
+            // TODO!
             //
             pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
         }
@@ -739,7 +751,7 @@ void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
     mMonitor.notify_one();
 }
 
-AddressPtr ICETransport::localAddress() 
+AddressPtr ICETransport::localAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     if (mLocalAddress)
@@ -753,7 +765,7 @@ AddressPtr ICETransport::localAddress()
     return mLocalAddress;
 }
 
-AddressPtr ICETransport::remoteAddress() 
+AddressPtr ICETransport::remoteAddress()
 {
     boost::unique_lock<boost::mutex> lock(mLock);
     return mRemoteAddress;
@@ -792,7 +804,7 @@ void ICETransport::start()
     PJICECallbackPtr callback(new pjmedia_ice_cb);
     callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
     NATModulePtr natModule = NATModule::create(mConfig, mEndpoint);
-    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1), 
+    pj_status_t result = pjmedia_ice_create(mEndpoint->endpoint(), "ASCF_ICE_MEDIA", (mEnableRTCP ? 2 : 1),
         natModule->configuration(), callback.get(), &t);
     if (fail(result))
     {
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index 69d3fc8..178bf22 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,19 +14,22 @@
  * at the top of the source tree.
  */
 
-#include "RTPConfigurationIf.h"
-#include "RTPConfiguration.h"
 
 #include <IceUtil/UUID.h>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
 #include <boost/thread.hpp>
-#include <boost/thread/shared_mutex.hpp>
 
-using namespace AsteriskSCF::System::Configuration::V1;
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
+#include "RTPConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Configuration::V1;
 using namespace std;
-using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 
 /**
  * Implementation of the configuration service.
@@ -34,16 +37,17 @@ using namespace AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1;
 class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
 {
 public:
+    ConfigurationServiceServant() : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)) {}
 
     /**
      * AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
      */
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
-    
+
     AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
     getConfigurationGroups(const Ice::Current&);
 
@@ -90,12 +94,12 @@ public:
     {
         mICEConfig = newConfig;
     }
-    
+
     void replaceConfig(const NATConfigPtr& newConfig)
     {
         mNATConfig = newConfig;
     }
-    
+
     void replaceConfig(const SRTPConfigurationPtr& newConfig)
     {
         mSRTPConfig = newConfig;
@@ -121,7 +125,7 @@ public:
         //
         if (mGeneralGroup)
         {
-            ConfigurationItemDict::const_iterator item = 
+            ConfigurationItemDict::const_iterator item =
                 mGeneralGroup->configurationItems.find(EnableSRTPItemName);
             if (item != mGeneralGroup->configurationItems.end())
             {
@@ -150,6 +154,7 @@ public:
     }
 
 private:
+    OperationContextCachePtr mOperationContextCache;
     /**
      * General RTP configuration
      */
@@ -223,7 +228,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
         void visitRTPICEConfigurationGroup(const RTPICEConfigurationGroupPtr& group)
         {
             RTPICEConfigurationGroupPtr currentGroup = mImpl->getICEConfigurationGroup();
-            
+
             if (!currentGroup)
             {
                 return;
@@ -233,19 +238,19 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfiguration(
                     returnedGroup->configurationItems);
             mGroups.push_back(returnedGroup);
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -259,7 +264,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
             mImpl(impl), mGroups(visitorGroups)
         {
         }
- 
+
     private:
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
         {
@@ -278,21 +283,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationAll(
                 mGroups.push_back(g);
             }
         }
-        
+
         ConfigurationServiceServantPtr mImpl;
         ConfigurationGroupSeq& mGroups;
     };
-    
+
     ConfigurationGroupSeq newGroups;
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this, newGroups);
 
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
     }
-    
+
     return newGroups;
 }
 
@@ -300,7 +305,7 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
 {
     ConfigurationGroupSeq groups;
     boost::shared_lock<boost::shared_mutex> lock(mLock);
-   
+
     if (mGeneralGroup)
     {
         groups.push_back(new RTPGeneralGroup);
@@ -309,21 +314,21 @@ ConfigurationGroupSeq ConfigurationServiceServant::getConfigurationGroups(const
     {
         groups.push_back(new RTPICEConfigurationGroup);
     }
-    
+
     return groups;
 }
 
-void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr&, const ConfigurationGroupSeq& groups,
+void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1::OperationContextPtr& context, const ConfigurationGroupSeq& groups,
     const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
- 
+
     private:
         /**
          * Helper function which performs serial number checking of items
@@ -340,22 +345,22 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
                 {
                     continue;
                 }
-  
+
                 ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
-  
+
                 if (localItem == localItems.end())
                 {
                     // This is a new item so serial checking does not apply
                     continue;
                 }
-  
+
                 if (item->second->serialNumber < localItem->second->serialNumber)
                 {
                     throw SerialConflict(group, item->second);
                 }
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
         {
             RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -367,7 +372,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
             {
                 performSerialCheck(group->configurationItems, g->configurationItems, group);
             }
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -415,7 +420,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
                         mImpl->replaceConfig(ICEConfiguration::create(mMaxCandidates, mMaxCalls));
                     }
                 }
-                
+
 
                 void visitSTUNServerItem(const STUNServerItemPtr& item)
                 {
@@ -454,11 +459,11 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
                         mMaxCandidates = item->maxCandidates;
                     }
                 }
-                
+
             private:
 
                 ConfigurationServiceServantPtr mImpl;
-                
+
                 bool mCreateRTPICEConfig;
                 bool mCreateNATConfig;
 
@@ -481,7 +486,7 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
             }
 
             RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
-     
+
             for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
                  item != group->configurationItems.end();
                  ++item)
@@ -495,7 +500,13 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
 
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
 
     boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -506,13 +517,13 @@ void ConfigurationServiceServant::setConfiguration(const AsteriskSCF::System::V1
 }
 
 void ConfigurationServiceServant::removeConfigurationItems(
-    const AsteriskSCF::System::V1::OperationContextPtr&,
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
@@ -536,7 +547,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                 localItems.erase(localItem);
             }
         }
- 
+
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr& group)
         {
             RTPGeneralGroupPtr g = mImpl->getGeneralGroup();
@@ -551,7 +562,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
             //
             // The Visitor in this case interprets the provided information
             // to decide what it needs to do. This type of thing might be better moved
-            // into a separate helper class (along with the set operations), but 
+            // into a separate helper class (along with the set operations), but
             // we'll leave it here for now to avoid inconsistencies with the
             // rest of the configuration implementation.
             //
@@ -566,7 +577,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                     mResetICEConfig(false)
                 {
                 }
-                
+
                 ~Visitor()
                 {
                     if (mDisable)
@@ -602,7 +613,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                         mImpl->replaceConfig(ICEConfigurationPtr());
                     }
                 }
-                
+
                 void visitSTUNServerItem(const STUNServerItemPtr& item)
                 {
                     if (item)
@@ -634,7 +645,7 @@ void ConfigurationServiceServant::removeConfigurationItems(
                         mResetICEConfig = true;
                     }
                 }
-                
+
             private:
                 ConfigurationServiceServantPtr mImpl;
                 bool mRemoveSTUNServer;
@@ -646,21 +657,27 @@ void ConfigurationServiceServant::removeConfigurationItems(
             };
 
             RTPICEConfigurationGroupPtr g = mImpl->getICEConfigurationGroup();
-            
+
             if (g)
             {
                 RTPConfigurationItemVisitorPtr v(new Visitor(mImpl));
                 removeItems(v, group->configurationItems, g->configurationItems);
             }
         }
- 
+
     private:
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
@@ -668,17 +685,17 @@ void ConfigurationServiceServant::removeConfigurationItems(
 }
 
 void ConfigurationServiceServant::removeConfigurationGroups(
-        const AsteriskSCF::System::V1::OperationContextPtr&,
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
         const ConfigurationGroupSeq& groups, const Ice::Current&)
 {
     class GroupVisitor : public RTPConfigurationGroupVisitor
     {
     public:
-        GroupVisitor(const ConfigurationServiceServantPtr& impl) : 
+        GroupVisitor(const ConfigurationServiceServantPtr& impl) :
             mImpl(impl)
         {
         }
- 
+
     private:
         void visitRTPGeneralGroup(const ::AsteriskSCF::Configuration::MediaRTPPJMEDIA::V1::RTPGeneralGroupPtr&)
         {
@@ -692,11 +709,17 @@ void ConfigurationServiceServant::removeConfigurationGroups(
 
         ConfigurationServiceServantPtr mImpl;
     };
-    
+
+    if (!mOperationContextCache->addOperationContext(context))
+    {
+        // retry detected
+        return;
+    }
+
     RTPConfigurationGroupVisitorPtr v = new GroupVisitor(this);
 
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    
+
     for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
     {
         (*group)->visit(v);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 9d5f5ce..41f9a3c 100755
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -32,22 +32,24 @@
 #include <IceUtil/UUID.h>
 
 #include <AsteriskSCF/Media/MediaIf.h>
-#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
-#include <AsteriskSCF/System/Component/ReplicaIf.h>
-#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media;
-using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
-using namespace AsteriskSCF::System::Component::V1;
-using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace std;
 
 /**
  * RTCP Information Interface implementation.
@@ -55,7 +57,8 @@ using namespace AsteriskSCF::SessionCommunications::V1;
 class RTCPInformationImpl : public RTCP::V1::Information
 {
 public:
-    RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+    RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream, const OperationContextCachePtr& cache) :
+        mOperationContextCache(cache),
         mGeneralStatistics(general), mStreamStatistics(stream) { }
 
     RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
@@ -89,14 +92,24 @@ public:
         return statistics;
     }
 
-    void addListener(const AsteriskSCF::System::V1::OperationContextPtr&, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    void addListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
 	boost::unique_lock<boost::shared_mutex> lock(mLock);
         mListeners.push_back(listener);
     }
 
-    void removeListener(const AsteriskSCF::System::V1::OperationContextPtr&, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    void removeListener(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
     {
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
 	boost::unique_lock<boost::shared_mutex> lock(mLock);
         mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
     }
@@ -116,6 +129,8 @@ private:
      */
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Listeners present.
      */
@@ -143,22 +158,22 @@ typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
 class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
 {
 public:
-    RTPSessionImpl(const Ice::ObjectAdapterPtr&, 
-            const string& id,
-            const PJMEDIAEnvironmentPtr& env,
-            const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
-	        const RTPReplicationContextPtr& replicationContext,
-	        const ConfigurationServiceImplPtr&);
-    
+    RTPSessionImpl(const Ice::ObjectAdapterPtr&,
+        const string& id,
+        const PJMEDIAEnvironmentPtr& env,
+        const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+        const RTPReplicationContextPtr& replicationContext,
+        const ConfigurationServiceImplPtr&);
+
     RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
-            const string& sessionIdentity,    
-            const PJMEDIAEnvironmentPtr& env,
-            Ice::Int port,
-            const AsteriskSCF::Media::V1::FormatSeq& formats,
-            bool isIPv6,
-            bool srtp,
-	        const RTPReplicationContextPtr& replicationContext,
-            const ConfigurationServiceImplPtr& configurationServant);
+        const string& sessionIdentity,
+        const PJMEDIAEnvironmentPtr& env,
+        Ice::Int port,
+        const AsteriskSCF::Media::V1::FormatSeq& formats,
+        bool isIPv6,
+        bool srtp,
+        const RTPReplicationContextPtr& replicationContext,
+        const ConfigurationServiceImplPtr& configurationServant);
 
     ~RTPSessionImpl();
 
@@ -169,15 +184,15 @@ public:
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
     void setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookies);
-    void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&, 
-            const AsteriskSCF::Media::V1::SessionCookies& cookies, 
+    void setCookies(const AsteriskSCF::System::V1::OperationContextPtr&,
+            const AsteriskSCF::Media::V1::SessionCookies& cookies,
             const Ice::Current&);
-    void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb, 
-                          const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet, 
+    void getCookies_async(const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
+                          const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
                           const Ice::Current&);
     void removeCookies(
             const AsteriskSCF::System::V1::OperationContextPtr&,
-            const AsteriskSCF::Media::V1::SessionCookies& cookies, 
+            const AsteriskSCF::Media::V1::SessionCookies& cookies,
             const Ice::Current&);
 
     void useRTCP(bool, const Ice::Current&);
@@ -191,7 +206,7 @@ public:
     void start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
 
     /**
-     * Internal methods. 
+     * Internal methods.
      */
     AsteriskSCF::Media::V1::FormatSeq getFormats();
     void setRemoteDetails(const std::string& address, Ice::Int port);
@@ -199,7 +214,7 @@ public:
     int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
 
     /**
-     * Accessors for the source and sink servants. 
+     * Accessors for the source and sink servants.
      */
     StreamSourceRTPImplPtr getSourceServant();
     StreamSinkRTPImplPtr getSinkServant();
@@ -246,6 +261,8 @@ private:
 
     boost::shared_mutex mLock;
 
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
      */
@@ -312,7 +329,7 @@ private:
      */
     RTPSessionStateItemPtr mSessionStateItem;
 
-    // Context for state replication for this component. 
+    // Context for state replication for this component.
     RTPReplicationContextPtr mReplicationContext;
 
     /**
@@ -353,7 +370,7 @@ public:
     /**
      * Constructor for this implementation.
      */
-    RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+    RTCPSessionImpl(const RTPSessionImplPtr& session, const OperationContextCachePtr& cache) : mOperationContetCache(cache), mSession(session) { }
 
     /**
      * Method used to retrieve the port our RTCP session is listening on.
@@ -368,12 +385,35 @@ public:
         return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
     }
 
-    void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&, const std::string& address, Ice::Int port, const Ice::Current&)
+    void setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context, const std::string& address, Ice::Int port, const Ice::Current&)
     {
-        mSession->setRemoteRtcpDetails(address, port);
+        ContextDataPtr data = checkAndThrow(mOperationContetCache, context);
+        if (!data)
+        {
+            // retry detected
+            return;
+        }
+
+        try
+        {
+            mSession->setRemoteRtcpDetails(address, port);
+            data->setCompleted();
+        }
+        catch (const std::exception& e)
+        {
+            data->setException(e);
+            throw;
+        }
+        catch (...)
+        {
+            data->setException();
+            throw;
+        }
     }
 
 private:
+    OperationContextCachePtr mOperationContetCache;
+
     /**
      * Pointer to the RTP session.
      */
@@ -393,7 +433,7 @@ public:
     {
     }
 
-    void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem) 
+    void replicateState(const RTPStreamSinkStateItemPtr& sinkStateItem)
     {
         mServant->replicateState(0, sinkStateItem, 0, 0, 0);
     }
@@ -413,12 +453,12 @@ public:
         mServant->replicateState(0, 0, 0, 0, item);
     }
 
-    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload) 
+    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload)
     {
         return mServant->getFormat(payload);
     }
 
-    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) 
+    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format)
     {
         return mServant->getPayload(format);
     }
@@ -470,7 +510,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
         const PJMEDIAEnvironmentPtr& env,
         const RTPServiceLocatorParamsPtr& params,
 	    const RTPReplicationContextPtr& replicationContext,
-        const ConfigurationServiceImplPtr& configurationService) : 
+        const ConfigurationServiceImplPtr& configurationService) :
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
     mEnvironment(env),
     mEndpoint(PJMEDIAEndpoint::create(env)),
     mId(id),
@@ -598,8 +639,8 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
     return mId;
 }
 
-/** 
- * Local implementation. 
+/**
+ * Local implementation.
  */
 void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict& cookieMap)
 {
@@ -607,35 +648,56 @@ void RTPSessionImpl::setCookies(const AsteriskSCF::Media::V1::SessionCookieDict&
     mSessionStateItem->cookies = cookieMap;
 }
 
-/** 
- * Support for the corresponding API call. 
+/**
+ * Support for the corresponding API call.
  */
 void RTPSessionImpl::setCookies(
-        const AsteriskSCF::System::V1::OperationContextPtr&,
-        const AsteriskSCF::Media::V1::SessionCookies& cookies, 
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::SessionCookies& cookies,
         const Ice::Current&)
 {
-    { // scope the lock
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
-             i != cookies.end();  ++i)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        { // scope the lock
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+                 i != cookies.end();  ++i)
+            {
+                mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+            }
+        }
+
+        if (mReplicationContext->isReplicating() == true)
         {
-            mSessionStateItem->cookies[(*i)->ice_id()] = (*i);
+            replicateState(mSessionStateItem, 0, 0, 0, 0);
         }
+        data->setCompleted();
     }
-
-    if (mReplicationContext->isReplicating() == true)
+    catch (const std::exception& e)
     {
-        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
 }
 
-/** 
- * Implementation of the corresponding API call. 
+/**
+ * Implementation of the corresponding API call.
  */
 void RTPSessionImpl::getCookies_async(
         const AsteriskSCF::Media::V1::AMD_Session_getCookiesPtr &cb,
-        const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet, 
+        const AsteriskSCF::Media::V1::SessionCookies& cookiesToGet,
         const Ice::Current&)
 {
     AsteriskSCF::Media::V1::SessionCookies results;
@@ -658,26 +720,47 @@ void RTPSessionImpl::getCookies_async(
     cb->ice_response(results);
 }
 
-/** 
- * Implementation of the corresponding API call. 
+/**
+ * Implementation of the corresponding API call.
  */
 void RTPSessionImpl::removeCookies(
-        const AsteriskSCF::System::V1::OperationContextPtr&,
-        const AsteriskSCF::Media::V1::SessionCookies& cookies, 
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::SessionCookies& cookies,
         const Ice::Current&)
 {
-    { // scope the lock
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
-                i != cookies.end(); ++i)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
+    {
+        { // scope the lock
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            for (AsteriskSCF::Media::V1::SessionCookies::const_iterator i = cookies.begin();
+                 i != cookies.end(); ++i)
+            {
+                mSessionStateItem->cookies.erase((*i)->ice_id());
+            }
+        }
+
+        if (mReplicationContext->isReplicating() == true)
         {
-            mSessionStateItem->cookies.erase((*i)->ice_id());
+            replicateState(mSessionStateItem, 0, 0, 0, 0);
         }
+        data->setCompleted();
     }
-
-    if (mReplicationContext->isReplicating() == true)
+    catch (const std::exception& e)
     {
-        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
 }
 
@@ -717,34 +800,97 @@ void RTPSessionImpl::release(const Ice::Current&)
 /**
  * Implementation of the associatePayloads method as defined in MediaRTPIf.ice
  */
-void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
+void RTPSessionImpl::associatePayloads(const AsteriskSCF::System::V1::OperationContextPtr& context, const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
 {
-    mSessionStateItem->payloadstoFormats = mappings;
-    associatePayloadsImpl(mappings);
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
 
-    // Only the session has changed so push a single update out for it
-    replicateState(mSessionStateItem, 0, 0, 0, 0);
+    try
+    {
+        mSessionStateItem->payloadstoFormats = mappings;
+        associatePayloadsImpl(mappings);
+
+        // Only the session has changed so push a single update out for it
+        replicateState(mSessionStateItem, 0, 0, 0, 0);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
+    }
 }
 
 
-void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::setOptions(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
 {
-    SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
-    if (!srtpTransport)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
     {
-        throw SRTPUnavailable();
+        SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+        if (!srtpTransport)
+        {
+            throw SRTPUnavailable();
+        }
+        srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
-    srtpTransport->setOptions(suiteName, key, enableAuthentication, enableEncryption);
 }
 
-void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr&, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
+void RTPSessionImpl::start(const AsteriskSCF::System::V1::OperationContextPtr& context, const string& suiteName, const string& key, bool enableAuthentication, bool enableEncryption, const Ice::Current&)
 {
-    SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
-    if (!srtpTransport)
+    ContextDataPtr data = checkAndThrow(mOperationContextCache, context);
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    try
     {
-        throw SRTPUnavailable();
+        SRTPTransportPtr srtpTransport(boost::dynamic_pointer_cast<SRTPTransport>(mTransport));
+        if (!srtpTransport)
+        {
+            throw SRTPUnavailable();
+        }
+        srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
     }
-    srtpTransport->start(suiteName, key, enableAuthentication, enableEncryption);
 }
 
 /**
@@ -935,7 +1081,7 @@ void RTPSessionImpl::replicateState(
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->setState(AsteriskSCF::Operations::createContext(), items);
+    mReplicationContext->getReplicator().tryOneWay()->setState(createContext(), items);
 }
 
 /**
@@ -972,7 +1118,7 @@ void RTPSessionImpl::removeState(const RTPSessionStateItemPtr& session, const RT
         return;
     }
 
-    mReplicationContext->getReplicator().tryOneWay()->removeState(AsteriskSCF::Operations::createContext(), items);
+    mReplicationContext->getReplicator().tryOneWay()->removeState(createContext(), items);
 }
 
 void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
@@ -1041,9 +1187,9 @@ RTPSessionPrx RTPSessionImpl::activate(
             outputs->eventSinks.push_back(mTelephonyEventSinkPrx);
         }
 
-        mRtcpSessionInterface = new RTCPSessionImpl(this);
-        mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
-        mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
+        mRtcpSessionInterface = new RTCPSessionImpl(this, mOperationContextCache);
+        mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx, mOperationContextCache);
+        mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx, mOperationContextCache);
         mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
         mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
         mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
@@ -1156,8 +1302,8 @@ private:
 
 RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
         const std::string& id, const RTPServiceLocatorParamsPtr& params,
-        const PJMEDIAEnvironmentPtr& environment, 
-        const RTPReplicationContextPtr& replicationContext, 
+        const PJMEDIAEnvironmentPtr& environment,
+        const RTPReplicationContextPtr& replicationContext,
         const ConfigurationServiceImplPtr& configuration,
         const RTPOptionsPtr& options,
         RTPAllocationOutputsPtr& outputs)
@@ -1168,18 +1314,18 @@ RTPSessionPrx AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapt
 }
 
 ReplicationAdapterPtr AsteriskSCF::PJMEDIARTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
-        const PJMEDIAEnvironmentPtr& environment, 
+        const PJMEDIAEnvironmentPtr& environment,
         const RTPSessionStateItemPtr& item,
-        const RTPReplicationContextPtr& replicationContext, 
+        const RTPReplicationContextPtr& replicationContext,
         const ConfigurationServiceImplPtr& configuration,
         const RTPOptionsPtr& options,
         RTPAllocationOutputsPtr& outputs)
 {
-    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, 
-                    adapter->getCommunicator()->identityToString(item->sessionIdentity), 
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
+                    adapter->getCommunicator()->identityToString(item->sessionIdentity),
                     environment,
                     item->port, item->formats, item->ipv6, item->srtp,
-                    replicationContext, 
+                    replicationContext,
                     configuration));
 
     servant->setCookies(item->cookies);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 8347a86..16faffb 100755
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -26,24 +26,28 @@
 #include "RTPStateReplicationIf.h"
 #include "RTPTelephonyEventSink.h"
 
+#include <boost/asio/detail/socket_ops.hpp>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
-#include <boost/asio/detail/socket_ops.hpp>
 
 #include <pjlib.h>
 #include <pjmedia.h>
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
-using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
 using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace std;
 
 /**
  * Private implementation details for the StreamSinkRTPImpl class.
@@ -55,10 +59,14 @@ public:
      * Constructor for our StreamSinkRTPImplPriv class.
      */
     StreamSinkRTPImplPriv(
-            const SessionAdapterPtr& sessionAdapter, 
+            const SessionAdapterPtr& sessionAdapter,
             const PJMEDIATransportPtr& transport,
             const std::string&);
 
+    boost::shared_mutex mMutex;
+
+    OperationContextCachePtr mOperationContextCache;
+
     /**
      * A structure containing outgoing pjmedia session data.
      */
@@ -91,11 +99,12 @@ public:
  * Constructor for the StreamSinkRTPImplPriv class.
  */
 StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(
-        const SessionAdapterPtr& session, 
+        const SessionAdapterPtr& session,
         const PJMEDIATransportPtr& transport,
         const string& sessionId) :
-    mSessionAdapter(session), mTransport(transport), 
-    mSinkStateItem(new RTPStreamSinkStateItem), 
+    mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+    mSessionAdapter(session), mTransport(transport),
+    mSinkStateItem(new RTPStreamSinkStateItem),
     mSessionId(sessionId)
 {
     pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
@@ -117,6 +126,7 @@ StreamSinkRTPImpl::StreamSinkRTPImpl(
 
 TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAdapterPtr& adapter, const Ice::Identity& id)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mTelephonyEventSink =
         new RTPTelephonyEventSink(
             &mImpl->mOutgoingSession,
@@ -129,6 +139,7 @@ TelephonyEventSinkPrx StreamSinkRTPImpl::createTelephonyEventSink(Ice::ObjectAda
 
 RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mTelephonyEventSink;
 }
 
@@ -137,6 +148,7 @@ RTPTelephonyEventSinkPtr StreamSinkRTPImpl::getTelephonyEventSink()
  */
 void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, const Ice::Current&)
 {
+//    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     // Don't even bother if no remote address information is present
     if (mImpl->mSinkStateItem->remoteAddress.empty() || !mImpl->mSinkStateItem->remotePort)
     {
@@ -213,9 +225,16 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
  * Implementation of the setSource method as defined in MediaIf.ice
  */
 void StreamSinkRTPImpl::setSource(
-    const AsteriskSCF::System::V1::OperationContextPtr&, 
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(context))
+    {
+        std::clog << "!!! RETRY !!!\n";
+        // retry detected
+        return;
+    }
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = source;
 
     mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
@@ -226,6 +245,7 @@ void StreamSinkRTPImpl::setSource(
  */
 AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem->source;
 }
 
@@ -234,6 +254,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSessionAdapter->getFormats();
 }
 
@@ -242,6 +263,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
  */
 std::string StreamSinkRTPImpl::getId(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     /* For now utilize the id of the session */
     return mImpl->mSessionId;
 }
@@ -249,21 +271,45 @@ std::string StreamSinkRTPImpl::getId(const Ice::Current&)
 /**
  * Implementation of the setRemoteDetails method as defined in MediaRTPIf.ice
  */
-void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr&, 
+void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::OperationContextPtr& context,
     const string& address, Ice::Int port, const Ice::Current&)
 {
-    /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
-     * actually attaching the transport.
-     */
-    mImpl->mSessionAdapter->setRemoteDetails(address, port);
+    ContextDataPtr data = checkAndThrow(mImpl->mOperationContextCache, context);
 
-    /* We do store it though in case we have not yet received a packet from the remote side but
-     * are asked for the remote address. It is also stored for replication purposes.
-     */
-    mImpl->mSinkStateItem->remoteAddress = address;
-    mImpl->mSinkStateItem->remotePort = port;
+    if (!data)
+    {
+        std::clog << "!!! RETRY !!!\n";
+        // retry detected
+        return;
+    }
 
-    mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+    try
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
+        /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
+         * actually attaching the transport.
+         */
+        mImpl->mSessionAdapter->setRemoteDetails(address, port);
+
+        /* We do store it though in case we have not yet received a packet from the remote side but
+         * are asked for the remote address. It is also stored for replication purposes.
+         */
+        mImpl->mSinkStateItem->remoteAddress = address;
+        mImpl->mSinkStateItem->remotePort = port;
+
+        mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
+        data->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        data->setException(e);
+        throw;
+    }
+    catch (...)
+    {
+        data->setException();
+        throw;
+    }
 }
 
 /**
@@ -271,6 +317,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const AsteriskSCF::System::V1::Operatio
  */
 std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         string address = mImpl->mTransport->remoteAddress()->hostname();
@@ -284,6 +331,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
  */
 Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTransport && mImpl->mTransport->remoteAddress())
     {
         int port = mImpl->mTransport->remoteAddress()->port();
@@ -297,11 +345,13 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
  */
 RTPStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     return mImpl->mSinkStateItem;
 }
 
 RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateItem()
 {
+    boost::shared_lock<boost::shared_mutex> lock(mImpl->mMutex);
     if (mImpl->mTelephonyEventSink)
     {
         return mImpl->mTelephonyEventSink->getStateItem();
@@ -311,17 +361,20 @@ RTPTelephonyEventSinkStateItemPtr StreamSinkRTPImpl::getTelephonyEventSinkStateI
 
 void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->remoteAddress = host;
     mImpl->mSinkStateItem->remotePort = port;
 }
 
 void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mMutex);
     mImpl->mSinkStateItem->source = proxy;
 }
 
 Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEncode, const AudioFormatPtr& audioFormat)
 {
+    // mImpl->mMutex should already be locked
     if (audioFormat->sampleSize == 8)
     {
         ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
@@ -354,6 +407,7 @@ Ice::ByteSeq StreamSinkRTPImpl::encodeAudioPayload(const FramePayloadPtr& toEnco
 
 Ice::ByteSeq StreamSinkRTPImpl::encodeVideoPayload(const FramePayloadPtr& toEncode, const VideoFormatPtr&)
 {
+    // no shared state, no need to lock
     ByteSeqPayloadPtr bytePayload = ByteSeqPayloadPtr::dynamicCast(toEncode);
 
     if (!bytePayload)
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 20a00d0..a1cd040 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -84,15 +84,15 @@ typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
 class RtcpTransmission : public IceUtil::TimerTask
 {
 public:
-    RtcpTransmission(const SessionAdapterPtr& sessionAdapter, 
-                     const StreamSinkRTPPrx& sink, 
+    RtcpTransmission(const SessionAdapterPtr& sessionAdapter,
+                     const StreamSinkRTPPrx& sink,
                      const PJMEDIATransportPtr& transport,
-                     const ThreadDescWrapperPtr& threadDesciptor) 
-                     : mSessionAdapter(sessionAdapter), 
-                       mSink(sink), 
+                     const ThreadDescWrapperPtr& threadDesciptor)
+                     : mSessionAdapter(sessionAdapter),
+                       mSink(sink),
                        mTransport(transport),
                        mThreadDescriptor(threadDesciptor)
-    { 
+    {
     }
 
     void runTimerTask()
@@ -161,7 +161,7 @@ public:
     /**
      * Constructor for our StreamSourceRTPImplPriv class.
      */
-    StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter, 
+    StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter,
                             const PJMEDIATransportPtr& transport,
                             const string& parentSessionId,
                             const StreamSourceRTPPrx& source,
@@ -225,13 +225,13 @@ public:
 /**
  * Constructor for the StreamSourceRTPImplPriv class.
  */
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session, 
-                                                 const PJMEDIATransportPtr& transport, 
+StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
+                                                 const PJMEDIATransportPtr& transport,
                                                  const string& sessionId,
                                                  const StreamSourceRTPPrx& source,
                                                  const StreamSinkRTPPrx& sink) :
-    mSessionAdapter(session), mTransport(transport), 
-    mSourceStateItem(new RTPStreamSourceStateItem), 
+    mSessionAdapter(session), mTransport(transport),
+    mSourceStateItem(new RTPStreamSourceStateItem),
     mSessionId(sessionId),
     mSource(source),
     mSink(sink),
@@ -282,6 +282,7 @@ RTPTelephonyEventSourcePtr StreamSourceRTPImpl::getTelephonyEventSource()
  */
 void StreamSourceRTPImpl::addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
+    // naturally idempotent; no retry logic needed
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
     // Do not allow the same sink to be added multiple times
@@ -304,6 +305,7 @@ void StreamSourceRTPImpl::addSink(const AsteriskSCF::System::V1::OperationContex
 void StreamSourceRTPImpl::removeSink(const AsteriskSCF::System::V1::OperationContextPtr&,
     const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
+    // naturally idempotent; no retry logic needed
     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 
     mImpl->mSourceStateItem->sinks.erase(std::remove(mImpl->mSourceStateItem->sinks.begin(),
@@ -345,7 +347,7 @@ void StreamSourceRTPImpl::requestFormat(
     const AsteriskSCF::System::V1::OperationContextPtr&,
     const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&)
 {
-    // We do not currently support switching formats. 
+    // We do not currently support switching formats.
     throw MediaFormatSwitchException();
 }
 
@@ -521,7 +523,7 @@ static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
 
     pjmedia_rtcp_rx_rtcp(source->mImpl->mSessionAdapter->getRtcpSession(), packet, size);
 
-    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners = 
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
         source->mImpl->mSessionAdapter->getReceiverReportListeners();
 
     if (listeners.empty())
diff --git a/src/RTPStateReplicatorListener.cpp b/src/RTPStateReplicatorListener.cpp
index 03498de..8e43755 100644
--- a/src/RTPStateReplicatorListener.cpp
+++ b/src/RTPStateReplicatorListener.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -19,18 +19,19 @@
 #include <IceUtil/UUID.h>
 
 #include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
 
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "RTPStateReplicator.h"
 #include "ReplicationAdapter.h"
 #include "RTPSession.h"
 
-using namespace std;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
+using namespace std;
 
 class RTPStateReplicatorItem
 {
@@ -62,33 +63,36 @@ private:
 struct RTPStateReplicatorListenerImpl
 {
 public:
-    RTPStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter, 
+    RTPStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter,
         const PJMEDIAEnvironmentPtr& env,
         const RTPGeneralStateItemPtr& generalState,
         const RTPReplicationContextPtr& replicationContext,
         const ConfigurationServiceImplPtr& configurationService)
-        : mId(IceUtil::generateUUID()), 
-          mAdapter(adapter), 
-          mEnvironment(env), 
+        : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
+          mId(IceUtil::generateUUID()),
+          mAdapter(adapter),
+          mEnvironment(env),
           mGeneralState(generalState),
           mReplicationContext(replicationContext),
           mConfigurationService(configurationService) {}
-    
+
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
     {
+        // naturally idempotent; no retry logic needed
+        boost::mutex::scoped_lock lock(mMutex);
         for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
         {
             // Just erasing this from the map will cause the destructor to actually shut things down
             mStateItems.erase((*key));
         }
     }
-    
-    void setStateNoticeImpl(const RTPStateItemSeq& items)
+
+    void setStateNoticeImpl(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTPStateItemSeq& items)
     {
         class visitor : public AsteriskSCF::Replication::MediaRTPPJMEDIA::V1::RTPStateItemVisitor
         {
         public:
-                visitor(RTPStateReplicatorListenerImpl *impl) : mImpl(impl)
+            visitor(RTPStateReplicatorListenerImpl *impl) : mImpl(impl)
             {
             }
 
@@ -99,7 +103,7 @@ public:
             {
                 mImpl->mGeneralState->serviceManagement = item->serviceManagement;
             }
-                
+
             void visitRTPSessionStateItem(const RTPSessionStateItemPtr &item)
             {
                 map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i = mImpl->mStateItems.find(item->sessionId);
@@ -121,7 +125,7 @@ public:
                 {
                     localitem = i->second;
                 }
-            
+
                 //
                 // TODO: This appears to happen in testing on occasion. Should verify if this should be
                 // expected.
@@ -131,7 +135,7 @@ public:
                     localitem->getSession()->update(item);
                 }
             }
-                
+
             void visitRTPStreamSinkStateItem(const RTPStreamSinkStateItemPtr &item)
             {
                 map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i =
@@ -141,7 +145,7 @@ public:
                     i->second->getSession()->update(item);
                 }
             }
-                
+
             void visitRTPStreamSourceStateItem(const RTPStreamSourceStateItemPtr &item)
             {
                 map<string, boost::shared_ptr<RTPStateReplicatorItem> >::iterator i =
@@ -174,6 +178,13 @@ public:
 
         };
 
+        if (!mOperationContextCache->addOperationContext(context))
+        {
+            // retry detected
+            return;
+        }
+
+        boost::mutex::scoped_lock lock(mMutex);
         AsteriskSCF::Replication::MediaRTPPJMEDIA::V1::RTPStateItemVisitorPtr v = new visitor(this);
 
         for (RTPStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
@@ -182,6 +193,8 @@ public:
         }
     }
 
+    boost::mutex mMutex;
+    OperationContextCachePtr mOperationContextCache;
     string mId;
     map<string, boost::shared_ptr<RTPStateReplicatorItem> > mStateItems;
     Ice::ObjectAdapterPtr mAdapter;
@@ -205,9 +218,9 @@ void RTPStateReplicatorListenerI::stateRemoved(const AsteriskSCF::System::V1::Op
     mImpl->removeStateNoticeImpl(itemKeys);
 }
 
-void RTPStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr&, const RTPStateItemSeq& items, const Ice::Current&)
+void RTPStateReplicatorListenerI::stateSet(const AsteriskSCF::System::V1::OperationContextPtr& context, const RTPStateItemSeq& items, const Ice::Current&)
 {
-    mImpl->setStateNoticeImpl(items);
+    mImpl->setStateNoticeImpl(context, items);
 }
 
 bool RTPStateReplicatorListenerI::operator==(const RTPStateReplicatorListenerI &rhs)
diff --git a/src/RTPTelephonyEventSink.cpp b/src/RTPTelephonyEventSink.cpp
index 75045a0..ecca74d 100755
--- a/src/RTPTelephonyEventSink.cpp
+++ b/src/RTPTelephonyEventSink.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2011, Digium, Inc.
+ * Copyright (C) 2011-2012, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -17,6 +17,7 @@
 #include "RTPTelephonyEventSink.h"
 
 #include <AsteriskSCF/Media/Formats/OtherFormatsIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include <IceUtil/UUID.h>
 #include <pjmedia.h>
 
@@ -41,13 +42,14 @@ const pj_uint16_t Duration = 160;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::PJMEDIARTP;
 using namespace AsteriskSCF::Replication::MediaRTPPJMEDIA::V1;
+using namespace AsteriskSCF::Operations;
 
 RTPTelephonyEventSink::RTPTelephonyEventSink(
         pjmedia_rtp_session *session,
         const PJMEDIATransportPtr& transport,
         const SessionAdapterPtr& sessionAdapter,
         const std::string& sessionId)
-    : mSession(session), mTransport(transport), mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSinkStateItem)
+    : mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)), mSession(session), mTransport(transport), mSessionAdapter(sessionAdapter), mStateItem(new RTPTelephonyEventSinkStateItem)
 {
     mStateItem->sessionId = sessionId;
     mStateItem->key = IceUtil::generateUUID();
@@ -56,15 +58,27 @@ RTPTelephonyEventSink::RTPTelephonyEventSink(
 
 void RTPTelephonyEventSink::write_async(
         const AMD_TelephonyEventSink_writePtr& cb,
-        const AsteriskSCF::System::V1::OperationContextPtr&,
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
         const TelephonyEventPtr& event,
         const Ice::Current&)
 {
+    typedef AMDContextData<AMD_TelephonyEventSink_writePtr> Data;
+    typedef boost::shared_ptr<Data> DataPtr;
+    DataPtr data = getContext<DataPtr>(mOperationContextCache, context, cb);
+
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    boost::mutex::scoped_lock lock(mMutex);
+
     BeginDTMFEventPtr beginDTMF;
     EndDTMFEventPtr endDTMF;
     ContinueDTMFEventPtr continueDTMF;
     FlashEventPtr flash;
-    
+
     pjmedia_rtp_dtmf_event payload;
     bool setMarker = false;
     bool tsRollover = false;
@@ -166,24 +180,36 @@ void RTPTelephonyEventSink::write_async(
         mSessionAdapter->replicateState(mStateItem);
     }
 
-    cb->ice_response();
+    data->getProxy()->ice_response();
 }
 
 void RTPTelephonyEventSink::setSource_async(
         const AMD_TelephonyEventSink_setSourcePtr& cb,
-        const AsteriskSCF::System::V1::OperationContextPtr&, 
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
         const TelephonyEventSourcePrx& source,
         const Ice::Current&)
 {
+    typedef AMDContextData<AMD_TelephonyEventSink_setSourcePtr> Data;
+    typedef boost::shared_ptr<Data> DataPtr;
+    DataPtr data = getContext<DataPtr>(mOperationContextCache, context, cb);
+
+    if (!data)
+    {
+        // retry detected
+        return;
+    }
+
+    boost::mutex::scoped_lock lock(mMutex);
     mStateItem->source = source;
     mSessionAdapter->replicateState(mStateItem);
-    cb->ice_response();
+    data->getProxy()->ice_response();
 }
 
 void RTPTelephonyEventSink::getSource_async(
         const AMD_TelephonyEventSink_getSourcePtr& cb,
         const Ice::Current&)
 {
+    boost::mutex::scoped_lock lock(mMutex);
     cb->ice_response(mStateItem->source);
 }
 
diff --git a/src/RTPTelephonyEventSink.h b/src/RTPTelephonyEventSink.h
index 4a73e80..de6ba11 100755
... 182 lines suppressed ...


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list