[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Apr 10 23:10:57 CDT 2012


branch "retry_deux" has been updated
       via  4c37ced142d56129a54a1e3dc49cd378fcad8244 (commit)
      from  71eeab06a117530626725bab0c43c5835ffe600c (commit)

Summary of changes:
 src/Component.cpp               |    2 +
 src/PJSIPRegistrarModule.cpp    |   76 +++-
 src/PJSIPRegistrarModule.h      |    4 +
 src/PJSIPSessionModule.cpp      |  154 +++-----
 src/PJSIPSessionModule.h        |    4 +
 src/SIPConfiguration.cpp        |  366 +++++++++-------
 src/SIPEndpoint.cpp             |  222 +++++++---
 src/SIPRegistrarListener.cpp    |   84 +++-
 src/SIPRegistrarListener.h      |    2 +
 src/SIPSession.cpp              |  885 ++++++++++++++++++++++++++-------------
 src/SIPTelephonyEventSink.cpp   |   49 ++-
 src/SIPTelephonyEventSource.cpp |   28 +-
 src/SIPTransfer.cpp             |   48 ++-
 src/SIPTransfer.h               |    5 +
 14 files changed, 1242 insertions(+), 687 deletions(-)


- Log -----------------------------------------------------------------
commit 4c37ced142d56129a54a1e3dc49cd378fcad8244
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue Apr 10 23:10:17 2012 -0500

    Uses the OperationMonitor helpers.

diff --git a/src/Component.cpp b/src/Component.cpp
index 5a3d3ed..e353b2f 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -95,11 +95,13 @@ public:
 
     void removeAuthHook(const OperationContextPtr& operationContext, const AuthHookPrx &hook, const Ice::Current&)
     {
+        // This method can be called any number of times, so no special retry logic required.
         mPJSIPManager->removeAuthHook(hook);
     }
 
     void clearAuthHooks(const OperationContextPtr& operationContext, const Ice::Current&)
     {
+        // This method can be called any number of times, so no special retry logic required.
         mPJSIPManager->clearAuthHooks();
     }
 private:
diff --git a/src/PJSIPRegistrarModule.cpp b/src/PJSIPRegistrarModule.cpp
index dc8c175..d2df307 100644
--- a/src/PJSIPRegistrarModule.cpp
+++ b/src/PJSIPRegistrarModule.cpp
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/WorkQueue/WorkQueue.h>
 #include <AsteriskSCF/WorkQueue/DefaultQueueListener.h>
 #include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "PJSIPRegistrarModule.h"
 #include "PJSIPManager.h"
@@ -32,6 +33,7 @@ using namespace AsteriskSCF::System::WorkQueue::V1;
 using namespace AsteriskSCF::System::Hook::V1;
 using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -148,7 +150,8 @@ void BindingWrapper::updateBinding(const std::string& callID, int cSeq, int expi
     queue->enqueueWork(new UpdateBinding(this, callID, cSeq, expiration));
 }
 
-RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener)
+RegistrarI::RegistrarI(const RegistrarListenerPrx& defaultListener) :
+    mOperationContextCache(OperationContextCache::create(120))
 {
     lg(Debug) << "In RegistrarI constructor, should be adding a listener...";
     mQueue = new AsteriskSCF::WorkQueue::WorkQueue();
@@ -174,23 +177,44 @@ class AddListener : public Work
 public:
     AddListener(
             const AMD_Registrar_addListenerPtr& cb,
+            const OperationContextPtr& operationContext,
             const RegistrarListenerPrx& listener,
             const RegistrarIPtr& registrar)
-        : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+        : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
 
     void execute()
     {
-        std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+        AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr>::ptr_type operationCookie = 
+            getContext<AMDContextResultData<ContactDict, AMD_Registrar_addListenerPtr> >(
+                mRegistrar->getOperationContextCache(),
+                mOperationContext,
+                mCB);
+        if (!operationCookie)
+        {
+            lg(Debug) << "Retry of addListener() detected for operation " << mOperationContext->id;
+            return;
+        }
 
-        if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+        try
         {
-            lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
-            listeners.push_back(mListener);
+            std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+            if (std::find(listeners.begin(), listeners.end(), mListener) == listeners.end())
+            {
+                lg(Debug) << "Adding Listener " << mListener->ice_getIdentity().name << " to registrar";
+                listeners.push_back(mListener);
+            }
+            operationCookie->setResult(mRegistrar->createContactDict(mRegistrar->getBindings()));
+        }
+        catch (const std::exception& e)
+        {
+            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            assert(false);
         }
-        mCB->ice_response(mRegistrar->createContactDict(mRegistrar->getBindings()));
     }
 private:
     AMD_Registrar_addListenerPtr mCB;
+    OperationContextPtr mOperationContext;
     RegistrarListenerPrx mListener;
     RegistrarIPtr mRegistrar;
 };
@@ -201,7 +225,7 @@ void RegistrarI::addListener_async(
         const RegistrarListenerPrx& listener,
         const Ice::Current&)
 {
-    mQueue->enqueueWork(new AddListener(cb, listener, this));
+    mQueue->enqueueWork(new AddListener(cb, operationContext, listener, this));
 }
 
 class RemoveListener : public Work
@@ -209,20 +233,41 @@ class RemoveListener : public Work
 public:
     RemoveListener(
             const AMD_Registrar_removeListenerPtr& cb,
+            const OperationContextPtr& operationContext,
             const RegistrarListenerPrx& listener,
             const RegistrarIPtr& registrar)
-        : mCB(cb), mListener(listener), mRegistrar(registrar) { }
+        : mCB(cb), mOperationContext(operationContext), mListener(listener), mRegistrar(registrar) { }
 
     void execute()
     {
-        std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+        AMDContextData<AMD_Registrar_removeListenerPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_Registrar_removeListenerPtr> >(
+                mRegistrar->getOperationContextCache(),
+                mOperationContext,
+                mCB);
+        if (!operationCookie)
+        {
+            lg(Debug) << "Retry of removeListener() detected for operation " << mOperationContext->id;
+            return;
+        }
 
-        listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
-        mCB->ice_response();
+        try
+        {
+            std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx>& listeners(mRegistrar->getListeners());
+
+            listeners.erase(std::remove(listeners.begin(), listeners.end(), mListener));
+            operationCookie->setCompleted();
+        }
+        catch (const std::exception& e)
+        {
+            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            assert(false);
+        }
     }
 
 private:
     AMD_Registrar_removeListenerPtr mCB;
+    OperationContextPtr mOperationContext;
     RegistrarListenerPrx mListener;
     RegistrarIPtr mRegistrar;
 
@@ -234,7 +279,7 @@ void RegistrarI::removeListener_async(
         const RegistrarListenerPrx& listener,
         const Ice::Current&)
 {
-    mQueue->enqueueWork(new RemoveListener(cb, listener, this));
+    mQueue->enqueueWork(new RemoveListener(cb, operationContext, listener, this));
 }
 
 class GetAllBindings : public Work
@@ -392,6 +437,11 @@ QueuePtr RegistrarI::getQueue()
     return mQueue;
 }
 
+OperationContextCachePtr RegistrarI::getOperationContextCache()
+{
+    return mOperationContextCache;
+}
+
 class ReplicateState : public Work
 {
 public:
diff --git a/src/PJSIPRegistrarModule.h b/src/PJSIPRegistrarModule.h
index bfa9033..ea0211b 100644
--- a/src/PJSIPRegistrarModule.h
+++ b/src/PJSIPRegistrarModule.h
@@ -19,6 +19,7 @@
 #include <AsteriskSCF/Discovery/SmartProxy.h>
 #include <AsteriskSCF/SIP/SIPRegistrarIf.h>
 #include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "SIPStateReplicator.h"
 #include "PJSIPModule.h"
@@ -88,11 +89,14 @@ public:
 
     AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
 
+    AsteriskSCF::Operations::OperationContextCachePtr getOperationContextCache();
+
 private:
     BindingWrapperDict mBindings;
     AsteriskSCF::SIP::Registration::V1::ContactDict mContacts;
     std::vector<AsteriskSCF::SIP::Registration::V1::RegistrarListenerPrx> mListeners;
     AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
 };
 
 typedef IceUtil::Handle<RegistrarI> RegistrarIPtr;
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index a244e7f..9fb3d35 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -42,11 +42,14 @@
 #include <AsteriskSCF/WorkQueue/SuspendableWorkQueue.h>
 #include <AsteriskSCF/Helpers/Retry.h>
 #include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SIP::ExtensionPoint::V1;
 using namespace AsteriskSCF::System::Hook::V1;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -195,25 +198,57 @@ SessionWorkPtr PJSIPSessionModInfo::getSessionWork()
     return mSessionWork;
 }
 
+SIPSessionCreationExtensionPoint::SIPSessionCreationExtensionPoint() :
+    mOperationContextCache(OperationContextCache::create(120))
+{
+}
+
 void SIPSessionCreationExtensionPoint::addSessionCreationHook(
     const OperationContextPtr& operationContext, 
     const SessionCreationHookPrx& hook, const Ice::Current&)
 {
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+    {
+        lg(Debug) << "SIPSessionCreationExtensionPoint::addSessionCreationHook() detected retry for operation " << operationContext->id;
+        return;
+    }
+
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     mHooks.push_back(hook);
+    contextData->setCompleted();
 }
+
 void SIPSessionCreationExtensionPoint::removeSessionCreationHook(
     const OperationContextPtr& operationContext, 
     const SessionCreationHookPrx& hook, const Ice::Current&)
 {
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+    {
+        lg(Debug) << "SIPSessionCreationExtensionPoint::removeSessionCreationHook() detected retry for operation " << operationContext->id;
+        return;
+    }
+
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     mHooks.erase(std::find(mHooks.begin(), mHooks.end(), hook));
+
+    contextData->setCompleted();
 }
 void SIPSessionCreationExtensionPoint::clearSessionCreationHooks(
     const OperationContextPtr& operationContext, const Ice::Current&)
 {
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+    {
+        lg(Debug) << "SIPSessionCreationExtensionPoint::clearSessionCreationHooks() detected retry for operation " << operationContext->id;
+        return;
+    }
+
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     mHooks.clear();
+
+    contextData->setCompleted();
 }
 
 SessionCreationHookSeq SIPSessionCreationExtensionPoint::getHooks()
@@ -394,9 +429,7 @@ public:
         mReplacedDialog(replacedDialog),
         mDestination(destination),
         mCallerID(callerID),
-        mRedirections(redirections),
-        mRetryPolicy(5, 500),
-        mOperationContext(AsteriskSCF::Operations::createContext()) { }
+        mRedirections(redirections) { }
 
 protected:
     SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -548,12 +581,9 @@ protected:
                 // Update the Party Id information on the session.
                 mSession->setSelfAsCaller();
 
-                // Setup an operation context to support retries. 
-                mOperationContext = AsteriskSCF::Operations::createContext();
-
-                SIPAMICallbackPtr amiCallback = new SIPAMICallback(listener, mSession, this, false, true);
-                SIPAMICallbackCookiePtr cookie = new SIPAMICallbackCookie(amiCallback);
-                invokeOperation(cookie);
+                SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
+                Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
+                sessionRouter->begin_routeSession(Operations::createContext(), mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d);
             }
         }
         catch (const Ice::CommunicatorDestroyedException &)
@@ -571,17 +601,6 @@ protected:
         return Complete;
     }
 
-    /**
-     * Invoke the operation. 
-     * @param cookie The cookie contains the SIPAMICallback object. Passed into the AMI operation so 
-     * that retry operations have access to the callback object. 
-     */
-    void invokeOperation(const SIPAMICallbackCookiePtr& cookie)
-    {
-        Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
-        mSessionRouter->begin_routeSession(mOperationContext, mSession->getSessionProxy(), mDestination, 0, mCallerID, mRedirections, d, cookie);
-    }
-
     SuspendableWorkResult calledBack(const Ice::AsyncResultPtr& asyncResult)
     {
         SessionRouterPrx router = SessionRouterPrx::uncheckedCast(asyncResult->getProxy());
@@ -589,25 +608,6 @@ protected:
         {
             router->end_routeSession(asyncResult);
         }
-        catch (const Ice::ConnectionLostException& cle)
-        {
-            // Assume a failover is occurring for the routing service. 
-            // This will block the WorkQueue's thread, but it's highly likely
-            // that the failover effects most of the other enqueued operations 
-            // anyway. 
-            if(mRetryPolicy.retry())
-            {
-                lg(Warning) << "SessionCreationOperation: Retrying routeSession operation.";
-
-                // Retry the operation. 
-                invokeOperation(SIPAMICallbackCookiePtr::dynamicCast(asyncResult->getCookie()));
-            }
-            else
-            {
-                lg(Error) << "SessionCreationOperation: ConnectionLostException routing session failed "  << mRetryPolicy.getMaxRetries() << " retries." ;
-                endSession(500);
-            }
-        }
         catch (const DestinationNotFoundException &)
         {
             endSession(404);
@@ -642,8 +642,6 @@ private:
     SIPSessionPtr mSession;
     CallerPtr mCallerID;
     RedirectionsPtr mRedirections;
-    RetryPolicy mRetryPolicy;
-    OperationContextPtr mOperationContext;
 };
 
 bool PJSIPSessionModule::getPrivacy(pjsip_rx_data *rdata)
@@ -1541,7 +1539,6 @@ public:
 protected:
     SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
     {
-        OperationContextPtr context = AsteriskSCF::Operations::createContext();
         lg(Debug) << "Executing HandleInviteResponseOperation" << std::endl;
         //Treat all 1XX messages we don't recognize the same as a 180
         if (mRespCode > 100 && mRespCode < 200 && mRespCode != 183)
@@ -1565,7 +1562,7 @@ protected:
                 {
                     SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
                     Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
-                    (*listener)->begin_indicated(context, mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
+                    (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new RingingIndication(), mSession->getCookies(), d);
                 }
                 catch (const Ice::Exception &ex)
                 {
@@ -1589,7 +1586,7 @@ protected:
                     Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
                     ProgressingIndicationPtr progressing(new ProgressingIndication());
                     progressing->response = response;
-                    (*listener)->begin_indicated(context, mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
+                    (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), progressing, mSession->getCookies(), d);
                 }
                 catch (const Ice::Exception &ex)
                 {
@@ -1612,7 +1609,7 @@ protected:
                     {
                         SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
                         Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
-                        (*listener)->begin_indicated(context, mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
+                        (*listener)->begin_indicated(Operations::createContext(), mSession->getSessionProxy(), new ConnectedIndication(), mSession->getCookies(), d);
                     }
                     catch (const Ice::Exception &ex)
                     {
@@ -1622,7 +1619,7 @@ protected:
             }
         }
         mSession->setSessionOwnerId(mConnected);
-        mSession->getSessionControllerProxy()->updateConnectedLine(context, mConnected);
+        mSession->getSessionControllerProxy()->updateConnectedLine(Operations::createContext(), mConnected);
         return Complete;
     }
 
@@ -1863,7 +1860,6 @@ protected:
                 lg(Debug) << "Relating stopped state to " << listeners.size() << " listeners";
                 AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped(new AsteriskSCF::SessionCommunications::V1::StoppedIndication());
                 stopped->response = response;
-                OperationContextPtr context = AsteriskSCF::Operations::createContext();
                 for (std::vector<AsteriskSCF::SessionCommunications::V1::SessionListenerPrx>::iterator listener =
                          listeners.begin();
                      listener != listeners.end();
@@ -1871,7 +1867,7 @@ protected:
                 {
                     try
                     {
-                        (*listener)->indicated(context, session->getSessionProxy(), stopped, session->getCookies());
+                        (*listener)->indicated(Operations::createContext(), session->getSessionProxy(), stopped, session->getCookies());
                     }
                     catch (const Ice::Exception &ex)
                     {
@@ -2136,9 +2132,7 @@ public:
         : mInv(inv), 
           mContact(contact), 
           mRouter(router), 
-          mSession(session),
-          mRetryPolicy(5, 500),
-          mOperationContext(AsteriskSCF::Operations::createContext()) { }
+          mSession(session) { }
 
 protected:
     SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2147,8 +2141,14 @@ protected:
         {
             SuspendableWorkListenerPtr listener = 0;
             SIPAMICallbackPtr cb(new SIPAMICallback(listener, mSession, this, false, true));
-            SIPAMICallbackCookiePtr cookie = new SIPAMICallbackCookie(cb);
-            invokeOperation(cookie);
+            Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
+            mRouter->begin_connectBridgedSessionsWithDestination(
+                    Operations::createContext(),
+                    mSession->getSessionProxy(),
+                    mContact,
+                    true,
+                    0,
+                    d);
         }
         catch (const Ice::CommunicatorDestroyedException&)
         {
@@ -2157,24 +2157,6 @@ protected:
         return Complete;
     }
 
-    /**
-     * Invoke the operation. 
-     * @param cookie The cookie contains the SIPAMICallback object. Passed into the AMI operation so 
-     * that retry operations have access to the callback object. 
-     */
-    void invokeOperation(const SIPAMICallbackCookiePtr& cookie)
-    {
-        Ice::CallbackPtr d = Ice::newCallback(cookie->getSIPAMICallback(), &SIPAMICallback::callback);
-        mRouter->begin_connectBridgedSessionsWithDestination(
-            mOperationContext,
-            mSession->getSessionProxy(),
-            mContact,
-            true,
-            0,
-            d,
-            cookie);
-    }
-
     SuspendableWorkResult calledBack(const Ice::AsyncResultPtr& asyncResult)
     {
         SessionRouterPrx router =
@@ -2188,28 +2170,6 @@ protected:
         {
             router->end_connectBridgedSessionsWithDestination(asyncResult);
         }
-        catch (const Ice::ConnectionLostException& cle)
-        {
-            // Assume a failover is occurring for the routing service. 
-            // This will block the WorkQueue's thread, but it's highly likely
-            // that the failover effects most of the other enqueued operations 
-            // anyway. 
-            if(mRetryPolicy.retry())
-            {
-                lg(Warning) << "HandleRedirection: Retrying connectBridgedSessionsWithDestination operation.";
-
-                // Retry the operation. 
-                invokeOperation(SIPAMICallbackCookiePtr::dynamicCast(asyncResult->getCookie()));
-            }
-            else
-            {
-                lg(Error) << "HandleRedirection: connectBridgedSessionsWithDestination failed "  << mRetryPolicy.getMaxRetries() << " retries." ;
-
-                op = PJSIP_REDIRECT_REJECT;
-                pjsip_inv_process_redirect(mInv, op, NULL);
-            }
-            return Complete;
-        }
         catch (const std::exception&)
         {
             op = PJSIP_REDIRECT_REJECT;
@@ -2224,8 +2184,6 @@ private:
     const std::string mContact;
     AsteriskSCF::Discovery::SmartProxy<SessionRouterPrx> mRouter;
     SIPSessionPtr mSession;
-    RetryPolicy mRetryPolicy;
-    OperationContextPtr mOperationContext;
 };
 
 pjsip_redirect_op PJSIPSessionModule::invOnRedirected(pjsip_inv_session* inv, const pjsip_uri* uri,
@@ -2259,8 +2217,7 @@ public:
     HandleSendReinviteResponse(pjsip_inv_session *inv, const int moduleId, pjsip_tx_data *tdata)
       : mInv(inv), 
         mModuleId(moduleId), 
-        mResponse(tdata),
-        mOperationContext(AsteriskSCF::Operations::createContext()) { }
+        mResponse(tdata) { }
 
 protected:
     SuspendableWorkResult initial(const SuspendableWorkListenerPtr&)
@@ -2299,7 +2256,7 @@ protected:
         // we can actually accept
         SIPAMICallbackPtr cb(new SIPAMICallback(0, mSession, this, false, true));
         Ice::CallbackPtr d = Ice::newCallback(cb, &SIPAMICallback::callback);
-        mSession->getSessionControllerProxy()->begin_addStreams(mOperationContext, mStreamsAdded, d);
+        mSession->getSessionControllerProxy()->begin_addStreams(Operations::createContext(), mStreamsAdded, d);
 
         return Complete;
     }
@@ -2338,7 +2295,6 @@ private:
     const int mModuleId;
     pjsip_tx_data *mResponse;
     StreamInformationDict mStreamsAdded;
-    OperationContextPtr mOperationContext;
     SIPSessionPtr mSession;
 };
 
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index 3dc5e5a..941704b 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -27,6 +27,7 @@
 #include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
 #include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
 #include <AsteriskSCF/PJLIB/ThreadHook.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "SIPEndpointFactory.h"
 #include "SIPReplicationContext.h"
@@ -84,6 +85,8 @@ typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModule
 class SIPSessionCreationExtensionPoint : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationExtensionPoint
 {
 public:
+    SIPSessionCreationExtensionPoint();
+
     void addSessionCreationHook(
         const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
         const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& hook, 
@@ -100,6 +103,7 @@ public:
 private:
     AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookSeq mHooks;
     boost::shared_mutex mLock;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
 };
 
 typedef IceUtil::Handle<SIPSessionCreationExtensionPoint> SIPSessionCreationExtensionPointPtr;
diff --git a/src/SIPConfiguration.cpp b/src/SIPConfiguration.cpp
index 31ebf66..1017b03 100644
--- a/src/SIPConfiguration.cpp
+++ b/src/SIPConfiguration.cpp
@@ -39,6 +39,7 @@
 #include <vector>
 #include <AsteriskSCF/Operations/OperationContext.h>
 #include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::Configuration::V1;
@@ -1955,81 +1956,96 @@ void ConfigurationServiceImpl::setConfiguration(
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups,
     const Ice::Current&)
 {
-    lg(Debug) << "Configuration: setting configuration data.";
-
-    // Is this a retry for an operation we're already processing?
-    if (!mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
     {
-        // It's not quite this simple. But for now, ...
+        lg(Debug) << "ConfigurationService::setConfiguration() detected retry for operation " << operationContext->id;
         return;
     }
 
-    class GroupsVisitor : public SIPConfigurationGroupVisitor
+    lg(Debug) << "Configuration: setting configuration data.";
+
+    try
     {
-    public:
-	GroupsVisitor(const ConfigurationServiceImplPtr& impl) : 
-          mImpl(impl) 
-        { 
-        };
+        class GroupsVisitor : public SIPConfigurationGroupVisitor
+        {
+        public:
+	    GroupsVisitor(const ConfigurationServiceImplPtr& impl) : 
+              mImpl(impl) 
+            { 
+            };
 	
-    private:
+        private:
         
-	void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
-	{
-            genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
-	}
+	    void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+	    {
+                genericSet<SIPGeneralGroupPtr>(mImpl->getData(), group);
+	    }
 	
-	void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
-	{
-            genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
-	};
+	    void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+	    {
+                genericSet<SIPDomainGroupPtr>(mImpl->getData(), group);
+	    };
 	
-	void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
-	{
-            genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
-	};
+	    void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+	    {
+                genericSet<SIPUDPTransportGroupPtr>(mImpl->getData(), group);
+	    };
 	
-	void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
-	{
-            genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
-	};
+	    void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+	    {
+                genericSet<SIPTCPTransportGroupPtr>(mImpl->getData(), group);
+	    };
 	
-	void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
-	{
-            genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
-	};
+	    void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+	    {
+                genericSet<SIPTLSTransportGroupPtr>(mImpl->getData(), group);
+	    };
 
-        void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
-        {
-            genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
-        }
+            void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+            {
+                genericSet<SIPSTUNTransportGroupPtr>(mImpl->getData(), group);
+            }
 
-        void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
-        {
-            genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
-        };
+            void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+            {
+                genericSet<SIPEndpointGroupPtr>(mImpl->getData(), group);
+            };
 
-        void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
-        {
-            genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
-        }
-        void visitIdentityGroup(const IdentityGroupPtr& group)
-        {
-            genericSet<IdentityGroupPtr>(mImpl->getData(), group);
-        }
+            void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+            {
+                genericSet<SIPRegistrationGroupPtr>(mImpl->getData(), group);
+            }
+            void visitIdentityGroup(const IdentityGroupPtr& group)
+            {
+                genericSet<IdentityGroupPtr>(mImpl->getData(), group);
+            }
 	
-	ConfigurationServiceImplPtr mImpl;
-    };
+	    ConfigurationServiceImplPtr mImpl;
+        };
     
-    SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+        SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
     
-    postProcesses.clear();
-    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        postProcesses.clear();
+        for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        {
+	    (*group)->visit(v);
+        }
+
+        runPostProcessing();
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
     {
-	(*group)->visit(v);
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
-
-    runPostProcessing();
 }
 
 void ConfigurationServiceImpl::removeConfigurationItems(
@@ -2037,150 +2053,182 @@ void ConfigurationServiceImpl::removeConfigurationItems(
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, 
     const Ice::Current&)
 {
-    // Is this a retry for an operation we're already processing?
-    if (!mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
     {
-        return; 
+        lg(Debug) << "ConfigurationService::removeConfigurationItems() detected retry for operation " << operationContext->id;
+        return;
     }
 
-    class GroupsVisitor : public SIPConfigurationGroupVisitor
+    try
     {
-    public:
-	GroupsVisitor(const ConfigurationServiceImplPtr& impl) : 
-          mImpl(impl) 
-        { 
-        };
+        class GroupsVisitor : public SIPConfigurationGroupVisitor
+        {
+        public:
+	    GroupsVisitor(const ConfigurationServiceImplPtr& impl) : 
+              mImpl(impl) 
+            { 
+            };
 	
-	void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
-	{
-            mImpl->getData()->removeFromGroup(group);
-	};
+	    void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+	    {
+                mImpl->getData()->removeFromGroup(group);
+	    };
 	
-	void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
-	{
-            mImpl->getData()->removeFromGroup(group);
-	};
+	    void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+	    {
+                mImpl->getData()->removeFromGroup(group);
+	    };
 	
-	void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
-	{
-            mImpl->getData()->removeFromGroup(group);
-	};
+	    void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+	    {
+                mImpl->getData()->removeFromGroup(group);
+	    };
 	
-	void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
-	{
-            mImpl->getData()->removeFromGroup(group);
-	};
+	    void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+	    {
+                mImpl->getData()->removeFromGroup(group);
+	    };
 	
-	void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
-	{
-            mImpl->getData()->removeFromGroup(group);
-	};
+	    void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+	    {
+                mImpl->getData()->removeFromGroup(group);
+	    };
 
-        void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
-        {
-            mImpl->getData()->removeFromGroup(group);
-        }
+            void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+            {
+                mImpl->getData()->removeFromGroup(group);
+            }
 
-        void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
-        {
-            mImpl->getData()->removeFromGroup(group);
-        };
+            void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+            {
+                mImpl->getData()->removeFromGroup(group);
+            };
 
-        void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
-        {
-            mImpl->getData()->removeFromGroup(group);
-        }
-        void visitIdentityGroup(const IdentityGroupPtr& group)
-        {
-            mImpl->getData()->removeFromGroup(group);
-        }
+            void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+            {
+                mImpl->getData()->removeFromGroup(group);
+            }
+            void visitIdentityGroup(const IdentityGroupPtr& group)
+            {
+                mImpl->getData()->removeFromGroup(group);
+            }
 
-    private:
-	ConfigurationServiceImplPtr mImpl;
-    };
+        private:
+	    ConfigurationServiceImplPtr mImpl;
+        };
     
-    SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
+        SIPConfigurationGroupVisitorPtr v = new GroupsVisitor(this);
     
-    postProcesses.clear();
-    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        postProcesses.clear();
+        for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        {
+	    (*group)->visit(v);
+        }
+        runPostProcessing();
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
     {
-	(*group)->visit(v);
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
-    runPostProcessing();
 }
 
 void ConfigurationServiceImpl::removeConfigurationGroups(
     const OperationContextPtr& operationContext,
     const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, 
-const Ice::Current&)
+    const Ice::Current&)
 {
-    // Is this a retry for an operation we're already processing?
-    if (!mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
     {
+        lg(Debug) << "ConfigurationService::removeConfigurationGroups() detected retry for operation " << operationContext->id;
         return;
     }
 
-    class Visitor : public SIPConfigurationGroupVisitor
+    try
     {
-    public:
-	Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
+        class Visitor : public SIPConfigurationGroupVisitor
+        {
+        public:
+	    Visitor(const ConfigurationServiceImplPtr& impl) : mImpl(impl) { };
 	
-    private:
-	void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
-	{
-            mImpl->getData()->remove(group);
-	};
+        private:
+	    void visitSIPGeneralGroup(const SIPGeneralGroupPtr& group)
+	    {
+                mImpl->getData()->remove(group);
+	    };
 	
-	void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
-	{
-            mImpl->getData()->remove(group);
-	};
+	    void visitSIPDomainGroup(const SIPDomainGroupPtr& group)
+	    {
+                mImpl->getData()->remove(group);
+	    };
 	
-	void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
-	{
-            mImpl->getData()->remove(group);
-	};
+	    void visitSIPUDPTransportGroup(const SIPUDPTransportGroupPtr& group)
+	    {
+                mImpl->getData()->remove(group);
+	    };
 	
-	void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
-	{
-            mImpl->getData()->remove(group);
-	};
+	    void visitSIPTCPTransportGroup(const SIPTCPTransportGroupPtr& group)
+	    {
+                mImpl->getData()->remove(group);
+	    };
 	
-	void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
-	{
-            mImpl->getData()->remove(group);
-	};
+	    void visitSIPTLSTransportGroup(const SIPTLSTransportGroupPtr& group)
+	    {
+                mImpl->getData()->remove(group);
+	    };
 
-        void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
-        {
-            mImpl->getData()->remove(group);
-        }
+            void visitSIPSTUNTransportGroup(const SIPSTUNTransportGroupPtr& group)
+            {
+                mImpl->getData()->remove(group);
+            }
 
-        void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
-        {
-            mImpl->getData()->remove(group);
-        };
+            void visitSIPEndpointGroup(const SIPEndpointGroupPtr& group)
+            {
+                mImpl->getData()->remove(group);
+            };
 
-        void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
-        {
-            mImpl->getData()->remove(group);
-        }
-        void visitIdentityGroup(const IdentityGroupPtr& group)
-        {
-            mImpl->getData()->remove(group);
-        }
+            void visitSIPRegistrationGroup(const SIPRegistrationGroupPtr& group)
+            {
+                mImpl->getData()->remove(group);
+            }
+            void visitIdentityGroup(const IdentityGroupPtr& group)
+            {
+                mImpl->getData()->remove(group);
+            }
 
-	ConfigurationServiceImplPtr mImpl;
-    };
+	    ConfigurationServiceImplPtr mImpl;
+        };
     
-    SIPConfigurationGroupVisitorPtr v = new Visitor(this);
+        SIPConfigurationGroupVisitorPtr v = new Visitor(this);
     
-    postProcesses.clear();
-    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        postProcesses.clear();
+        for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+        {
+	    (*group)->visit(v);
+        }
+        runPostProcessing();
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
     {
-	(*group)->visit(v);
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
-    runPostProcessing();
 }
 };
 };
diff --git a/src/SIPEndpoint.cpp b/src/SIPEndpoint.cpp
index 65ac5f4..a26d5a8 100644
--- a/src/SIPEndpoint.cpp
+++ b/src/SIPEndpoint.cpp
@@ -29,6 +29,7 @@
 #include <AsteriskSCF/Media/SDP/MediaSDPIf.h>
 #include <AsteriskSCF/Operations/OperationContextCache.h>
 #include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "NATOptions.h"
 
@@ -535,57 +536,78 @@ std::string SIPEndpoint::getId(const Ice::Current&)
 }
 
 /**
- * This version of this overloaded operation handles a slice invocation. 
+ * This version of this overloaded operation handles a remote invocation for an active component. 
  */
 AsteriskSCF::SessionCommunications::V1::SessionPrx SIPEndpoint::createSession(
-        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const OperationContextPtr& operationContext,
         const string& destination,
         const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
         const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
         const Ice::Current&)
 {
 
+    std::pair<bool, ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type> cacheResult = 
+        getContextSync<ContextResultData<AsteriskSCF::SessionCommunications::V1::SessionPrx>::ptr_type>(
+        mImplPriv->mOperationContextCache, operationContext);
+    if (cacheResult.first)
+    {
+        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+        return cacheResult.second->getResult();
+    }
+
     std::cout << "Got call over Ice to create a session for endpoint " << mImplPriv->mName << std::endl;
     if (mImplPriv->mConfig.sessionConfig.callDirection != BOTH &&
             mImplPriv->mConfig.sessionConfig.callDirection != INBOUND)
     {
         // TODO: We should have an exception here
+        cacheResult.second->setResult(0);
         return 0;
     }
 
-    // Combine the default listeners and the argument listener.
-    vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
-    if (listener != 0)
+    try
     {
-        listeners.push_back(listener);
-    }
-
-    AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
-
-    SIPSessionPtr session = SIPSession::create(
-            operationContext,
-            mImplPriv->mAdapter, 
-	    this, 
-	    destination, 
-	    listeners, 
-            defaultCookies, 
-	    mImplPriv->mManager, 
-	    mImplPriv->mServiceLocator, 
-	    mImplPriv->mReplicationContext, 
-            oneShotHook,
-            mImplPriv->mConfig.sessionConfig.rtpOverIPv6, 
-	    true, 
-	    mImplPriv->mConfig, 
-            NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, 
-                               mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
-                               mImplPriv->mConfig.transportConfig.enableNAT,
-                               mImplPriv->mConfig.sessionConfig.udptlOverICE,
-                               mImplPriv->mConfig.sessionConfig.udptlWithTURN));
-
-    mImplPriv->mSessions.push_back(session);
+        // Combine the default listeners and the argument listener.
+        vector<SessionListenerPrx> listeners = mImplPriv->mDefaultListeners->getAll();
+        if (listener != 0)
+        {
+            listeners.push_back(listener);
+        }
 
-    std::cout << "And now we're returing a session proxy..." << std::endl;
-    return session->getSessionProxy();
+        AsteriskSCF::SessionCommunications::V1::SessionCookies defaultCookies = mImplPriv->mDefaultSessionCookies->getAll();
+
+        SIPSessionPtr session = SIPSession::create(
+                operationContext,
+                mImplPriv->mAdapter, 
+	        this, 
+	        destination, 
+	        listeners, 
+                defaultCookies, 
+	        mImplPriv->mManager, 
+	        mImplPriv->mServiceLocator, 
+	        mImplPriv->mReplicationContext, 
+                oneShotHook,
+                mImplPriv->mConfig.sessionConfig.rtpOverIPv6, 
+	        true, 
+	        mImplPriv->mConfig, 
+                NATEndpointOptions(mImplPriv->mConfig.sessionConfig.rtpOverICE, 
+                                   mImplPriv->mConfig.sessionConfig.rtpICEIncludeTURN,
+                                   mImplPriv->mConfig.transportConfig.enableNAT,
+                                   mImplPriv->mConfig.sessionConfig.udptlOverICE,
+                                   mImplPriv->mConfig.sessionConfig.udptlWithTURN));
+
+        mImplPriv->mSessions.push_back(session);
+
+        std::cout << "And now we're returing a session proxy..." << std::endl;
+
+        cacheResult.second->setResult(session->getSessionProxy());
+        return session->getSessionProxy();
+    }
+    catch (const std::exception& e)
+    {
+        cacheResult.second->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
+    }
 }
 
 /**
@@ -710,28 +732,43 @@ void SIPEndpoint::addDefaultSessionListener(
     const SessionListenerPrx& listener, 
     const Ice::Current&)
 {
-    if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
     {
-        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " <<  operationContext->transactionId;
+        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
         return;
     }
 
-    mImplPriv->mDefaultListeners->add(listener);
+    try
+    {
+        mImplPriv->mDefaultListeners->add(listener);
 
-    if (mImplPriv->mReplicationContext->isReplicating() == false)
+        if (mImplPriv->mReplicationContext->isReplicating())
+        {
+            // Replicate this change.
+            SIPStateItemSeq items;
+            items.push_back(new DefaultSessionListenerItem
+                    (mImplPriv->replicaKeyName(listener), 
+                    "", 
+                     mImplPriv->mName, 
+                    listener));
+
+            mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
+        }
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
     {
-        return;
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
 
-    // Replicate this change.
-    SIPStateItemSeq items;
-    items.push_back(new DefaultSessionListenerItem
-            (mImplPriv->replicaKeyName(listener), 
-            "", 
-             mImplPriv->mName, 
-            listener));
-
-    mImplPriv->mReplicationContext->getReplicator().tryOneWay()->setState(Operations::createContext(), items);
 }
 
 void SIPEndpoint::removeDefaultSessionListener(
@@ -739,28 +776,46 @@ void SIPEndpoint::removeDefaultSessionListener(
     const SessionListenerPrx& listener, 
     const Ice::Current&)
 {
-    if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
     {
-        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " <<  operationContext->transactionId;
+        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
         return;
     }
 
-    mImplPriv->mDefaultListeners->remove(listener);
+    try
+    {
+        mImplPriv->mDefaultListeners->remove(listener);
 
-    if (mImplPriv->mReplicationContext->isReplicating() == false)
+        if (mImplPriv->mReplicationContext->isReplicating() == false)
+        {
+            contextData->setCompleted();
+            return;
+        }
+
+        // Replicate this change. 
+        SIPStateItemSeq items;
+        items.push_back(new DefaultSessionListenerItem
+                (mImplPriv->replicaKeyName(listener), 
+                "", 
+                    mImplPriv->mName, 
+                listener));
+
+        mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
     {
-        return;
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
 
-    // Replicate this change. 
-    SIPStateItemSeq items;
-    items.push_back(new DefaultSessionListenerItem
-            (mImplPriv->replicaKeyName(listener), 
-            "", 
-                mImplPriv->mName, 
-            listener));
-
-    mImplPriv->mReplicationContext->getReplicator().tryOneWay()->removeStateForItems(Operations::createContext(), items);
 }
 
 /** 
@@ -829,13 +884,29 @@ void SIPEndpoint::addDefaultSessionCookies(
     const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, 
     const Ice::Current&)
 {
-    if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
     {
-        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " <<  operationContext->transactionId;
+        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
         return;
     }
 
-    addDefaultSessionCookies(cookies, false);
+    try
+    {
+        addDefaultSessionCookies(cookies, false);
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
+    }
 }
 
 /**
@@ -882,14 +953,29 @@ void SIPEndpoint::removeDefaultSessionCookies(
     const AsteriskSCF::SessionCommunications::V1::SessionCookies& cookies, 
     const Ice::Current&)
 {
-
-    if (!mImplPriv->mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mImplPriv->mOperationContextCache, operationContext)))
     {
-        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected and rejected duplicate operation. Id = " << operationContext->id << " transaction = " <<  operationContext->transactionId;
+        lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
         return;
     }
 
-    removeDefaultSessionCookies(cookies, false);
+    try
+    {
+        removeDefaultSessionCookies(cookies, false);
+        contextData->setCompleted();
+    }
+    catch (const Ice::Exception& ex)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(ex)));
+        throw;
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
+    }
 }
 
 void SIPEndpoint::addDefaultSessionCookie(const AsteriskSCF::SessionCommunications::V1::SessionCookiePtr& cookie)
diff --git a/src/SIPRegistrarListener.cpp b/src/SIPRegistrarListener.cpp
index 93c2ca5..6fd2084 100644
--- a/src/SIPRegistrarListener.cpp
+++ b/src/SIPRegistrarListener.cpp
@@ -14,11 +14,13 @@
  * at the top of the source tree.
  */
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 #include "SIPRegistrarListener.h"
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -33,7 +35,7 @@ namespace SIPSessionManager
 using namespace AsteriskSCF::SIP::Registration::V1;
 
 SIPDefaultRegistrarListener::SIPDefaultRegistrarListener(const boost::shared_ptr<SIPEndpointFactory>& endpointFactory)
-    : mEndpointFactory(endpointFactory)
+    : mOperationContextCache(OperationContextCache::create(120)), mEndpointFactory(endpointFactory)
 {
     pj_caching_pool_init(&mCachingPool, NULL, 2048);
 }
@@ -79,17 +81,34 @@ void SIPDefaultRegistrarListener::contactsAdded(
      const BindingUpdateSeq& contacts, 
      const Ice::Current&)
 {
-    pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
-    for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
     {
-        SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
-        for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
-                contactIter != iter->contacts.end(); ++contactIter)
+        lg(Debug) << "SIPDefaultRegistrarListener::contactsAdded() detected retry for operation " << operationContext->id;
+        return;
+    }
+
+    try
+    {
+        pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+        for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
         {
-            updateEndpoints(endpoints, pool, *contactIter);
+            SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
+            for (Ice::StringSeq::const_iterator contactIter = iter->contacts.begin();
+                    contactIter != iter->contacts.end(); ++contactIter)
+            {
+                updateEndpoints(endpoints, pool, *contactIter);
+            }
         }
+        pj_pool_release(pool);
+        contextData->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
-    pj_pool_release(pool);
 }
 
 void SIPDefaultRegistrarListener::contactsRemoved(
@@ -97,26 +116,43 @@ void SIPDefaultRegistrarListener::contactsRemoved(
     const BindingUpdateSeq& contacts, 
     const Ice::Current&)
 {
-    pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
-    for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
+    ContextDataPtr contextData;
+    if (!(contextData = Operations::checkAndThrow(mOperationContextCache, operationContext)))
     {
-        if (iter->contacts.empty())
-        {
-            //If an aor has no contacts being removed, bail now.
-            continue;
-        }
-        SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
-        for (SIPEndpointSeq::iterator endpointIter = endpoints.begin();
-                endpointIter != endpoints.end(); ++endpointIter)
+        lg(Debug) << "SIPDefaultRegistrarListener::contactsRemoved() detected retry for operation " << operationContext->id;
+        return;
+    }
+
+    try
+    {
+        pj_pool_t *pool = pj_pool_create(&mCachingPool.factory, "DefaultRegistrarListener", 256, 256, NULL);
+        for (BindingUpdateSeq::const_iterator iter = contacts.begin(); iter != contacts.end(); ++iter)
         {
-            lg(Debug) << "Removing contacts from AoR " << iter->aor;
-            //Setting the endpoint's target address to be empty is the method we currently use
-            //to make the endpoint inaccessible. Once endpoints can support multiple addresses,
-            //it'll be a bit less hacky since we can just remove a specific contact from the list.
-            (*endpointIter)->setTargetAddress("", 0);
+            if (iter->contacts.empty())
+            {
+                //If an aor has no contacts being removed, bail now.
+                continue;
+            }
+            SIPEndpointSeq endpoints = getEndpoints(pool, iter->aor);
+            for (SIPEndpointSeq::iterator endpointIter = endpoints.begin();
+                    endpointIter != endpoints.end(); ++endpointIter)
+            {
+                lg(Debug) << "Removing contacts from AoR " << iter->aor;
+                //Setting the endpoint's target address to be empty is the method we currently use
+                //to make the endpoint inaccessible. Once endpoints can support multiple addresses,
+                //it'll be a bit less hacky since we can just remove a specific contact from the list.
+                (*endpointIter)->setTargetAddress("", 0);
+            }
         }
+        pj_pool_release(pool);
+        contextData->setCompleted();
+    }
+    catch (const std::exception& e)
+    {
+        contextData->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+        assert(false);
+        throw;
     }
-    pj_pool_release(pool);
 }
 
 } // namespace SIPSessionManager
diff --git a/src/SIPRegistrarListener.h b/src/SIPRegistrarListener.h
index bfe5879..1e6e017 100644
--- a/src/SIPRegistrarListener.h
+++ b/src/SIPRegistrarListener.h
@@ -19,6 +19,7 @@
 #include <pjsip.h>
 #include <pjlib.h>
 #include <AsteriskSCF/SIP/SIPRegistrarIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "SIPEndpointFactory.h"
 
@@ -38,6 +39,7 @@ public:
     void contactsRemoved(const AsteriskSCF::System::V1::OperationContextPtr&,
         const AsteriskSCF::SIP::Registration::V1::BindingUpdateSeq&, const Ice::Current&);
 private:
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
     SIPEndpointSeq getEndpoints(pj_pool_t *pool, const std::string& aor);
     void updateEndpoints(const SIPEndpointSeq& endpoints, pj_pool_t *pool, const std::string& contact);
     boost::shared_ptr<SIPEndpointFactory> mEndpointFactory;
diff --git a/src/SIPSession.cpp b/src/SIPSession.cpp
index 982a390..c1215fb 100755
--- a/src/SIPSession.cpp
+++ b/src/SIPSession.cpp
@@ -48,6 +48,7 @@
 #include <AsteriskSCF/Collections/HandleSet.h>
 #include <AsteriskSCF/Operations/OperationContextCache.h>
 #include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 #include "NATOptions.h"
 #include "PJSIPSessionModule.h"
 
@@ -426,34 +427,40 @@ public:
      * @param sessionPriv The session for whom the information is requested from.
      */
     SetMediaSessionCookiesOperation(
-        const OperationContextPtr& sourceContext,
+        const OperationContextPtr& operationContext,
         const AsteriskSCF::Media::V1::SessionCookies& cookies, 
         const SIPSessionPtr& session)
-         : mSourceContext(sourceContext), mCookies(cookies), mSession(session)
+         : mOperationContext(operationContext), mCookies(cookies), mSession(session)
     {
     }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        // Note: Retry logic is handled by the operation that enqueues this Work. 
+
         lg(Debug) << "Executing a SetMediaSessionCookiesOperation operation";
 
         AsteriskSCF::Media::V1::SessionCookies results;
 
         // Set the cookies on all of the Session's RTP Media Sessions. 
         RTPMediaSessionDict mediaSessions = mSession->getRTPMediaSessions();
+        int count=0;
+        string modifier;
         for(RTPMediaSessionDict::iterator i = mediaSessions.begin(); 
-            i != mediaSessions.end(); ++i)
+            i != mediaSessions.end(); ++count, ++i)
         {
-            i->second->setCookies(Operations::createContext(mSourceContext), mCookies);
+            modifier = "setCookies." + boost::lexical_cast<std::string>(count);
+            i->second->setCookies(Operations::calculateOperationContext(mOperationContext, modifier), mCookies);
         }
 
         return Complete;
     }
 
 private:
-    OperationContextPtr mSourceContext;
+    OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::SessionCookies mCookies;
     SIPSessionPtr mSession;
+    SIPSessionPrivPtr mSessionPriv;
 };
 
 /**
@@ -468,34 +475,42 @@ public:
      * @param sessionPriv The session for whom the information is requested from.
      */
     RemoveMediaSessionCookiesOperation(
-        const OperationContextPtr& sourceContext,
+        const OperationContextPtr& operationContext,
         const AsteriskSCF::Media::V1::SessionCookies& cookieTypes, 
         const SIPSessionPtr& session)
-         : mSourceContext(sourceContext), mCookieTypes(cookieTypes), mSession(session)
+         : mOperationContext(operationContext), 
+           mCookieTypes(cookieTypes), 
+           mSession(session)
     {
     }
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
+        // Note: Retry logic is handled by the operation that enqueues this Work. 
+
         lg(Debug) << "Executing a RemoveMediaSessionCookiesOperation operation";
 
         AsteriskSCF::Media::V1::SessionCookies results;
 
         // Set the cookies on all of the Session's RTP Media Sessions. 
         RTPMediaSessionDict mediaSessions = mSession->getRTPMediaSessions();
+        int count=0;
+        string modifier;
         for(RTPMediaSessionDict::iterator i = mediaSessions.begin(); 
-            i != mediaSessions.end(); ++i)
+            i != mediaSessions.end(); ++count, ++i)
         {
-            i->second->removeCookies(Operations::createContext(mSourceContext), mCookieTypes);
+            modifier = "removeCookies." + boost::lexical_cast<std::string>(count);
+            i->second->removeCookies(Operations::createContext(mOperationContext), mCookieTypes);
         }
 
         return Complete;
     }
 
 private:
-    OperationContextPtr mSourceContext;
+    OperationContextPtr mOperationContext;
     AsteriskSCF::Media::V1::SessionCookies mCookieTypes;
     SIPSessionPtr mSession;
+    SIPSessionPrivPtr mSessionPriv;
 };
 
 /** 
@@ -504,9 +519,11 @@ private:
 class SIPMediaSession : public Media::V1::Session
 {
 public:
-    SIPMediaSession(const SIPSessionPtr& session) 
+    SIPMediaSession(const SIPSessionPtr& session,
+        const boost::shared_ptr<SIPSessionPriv>& sessionPriv) 
         : mId(IceUtil::generateUUID()), 
-          mSession(session)
+          mSession(session),
+          mOperationContextCache(OperationContextCache::create(120))
     {
     }
 
@@ -530,7 +547,17 @@ public:
         const AsteriskSCF::Media::V1::SessionCookies& cookies, 
         const Ice::Current&)
     {
+        ContextDataPtr operationCookie;
+        if (!(operationCookie = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+        {
+            lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+            return;
+        }
+
         mSession->enqueueSessionWork(new SetMediaSessionCookiesOperation(operationContext, cookies, mSession));
+
+        // Enqueuing the operation is "complete" enough for this case.
+        operationCookie->setCompleted();
     }
 
     void getCookies_async(
@@ -546,7 +573,17 @@ public:
         const AsteriskSCF::Media::V1::SessionCookies& cookies, 
         const Ice::Current&)
     {
+        ContextDataPtr operationCookie;
+        if (!(operationCookie = Operations::checkAndThrow(mOperationContextCache, operationContext)))
+        {
+            lg(Debug) << BOOST_CURRENT_FUNCTION << " detected retry for operation " << operationContext->id;
+            return;
+        }
+
         mSession->enqueueSessionWork(new RemoveMediaSessionCookiesOperation(operationContext, cookies, mSession));
+
+        // Enqueuing the operation is "complete" enough for this case.
+        operationCookie->setCompleted();
     }
 
 private:
@@ -559,6 +596,8 @@ private:
      * A pointer to the communications session that created us.
      */
     SIPSessionPtr mSession;
+
+    OperationContextCachePtr mOperationContextCache;
 };
 
 /**
@@ -582,109 +621,120 @@ public:
     
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        // Is this a retry for an operation we're already processing?
-        if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+        AMDContextData<AMD_SessionController_changeStreamStatesPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_SessionController_changeStreamStatesPtr> >(
+                mImplPriv->mOperationContextCache,
+                mOperationContext,
+                mCb);
+        if (!operationCookie)
         {
-            lg(Debug) << "Retry of previously processed changeStreamStates() operation detected and rejected.";
+            lg(Debug) << "changeStreamStates() detected retry for operation " << mOperationContext->id;
             return Complete;
         }
 
         lg(Debug) << "Executing a changeStreamStates Operation";
 
-        // This boolean is set to true if at least one stream is actually changed. This is to prevent
-        // needless reinvites.
-        bool changed = false;
-            
-        // We iterate through each stream making sure we have one that matches it
-        for (AsteriskSCF::Media::V1::StreamStateDict::const_iterator stream = mStreams.begin();
-             stream != mStreams.end();
-             ++stream)
+        try
         {
-            AsteriskSCF::Media::V1::StreamInformationDict::iterator ourStream = mImplPriv->mStreams.find(stream->first);
-                
-            // If we don't have a stream stored locally then they gave us a stream that we really know nothing about it
-            if (ourStream == mImplPriv->mStreams.end())
+            // This boolean is set to true if at least one stream is actually changed. This is to prevent
+            // needless reinvites.
+            bool changed = false;
+            
+            // We iterate through each stream making sure we have one that matches it
+            for (AsteriskSCF::Media::V1::StreamStateDict::const_iterator stream = mStreams.begin();
+                 stream != mStreams.end();
+                 ++stream)
             {
-                continue;
-            }
+                AsteriskSCF::Media::V1::StreamInformationDict::iterator ourStream = mImplPriv->mStreams.find(stream->first);
+                
+                // If we don't have a stream stored locally then they gave us a stream that we really know nothing about it
+                if (ourStream == mImplPriv->mStreams.end())
+                {
+                    continue;
+                }
 
-            // If this doesn't actually alter the state of the stream do nothing, since it would just be silly
-            if (ourStream->second->state == stream->second)
-            {
-                continue;
-            }
+                // If this doesn't actually alter the state of the stream do nothing, since it would just be silly
+                if (ourStream->second->state == stream->second)
+                {
+                    continue;
+                }
             
-            // The implementation of how we store streams and SDP are linked together, if we can find a stream
-            // in our dictionary of streams than it also exists in the SDP
-            pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(stream->first)];
+                // The implementation of how we store streams and SDP are linked together, if we can find a stream
+                // in our dictionary of streams than it also exists in the SDP
+                pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(stream->first)];
 
-            // Depending on the current state go ahead and remove the current attribute
-            if (ourStream->second->state == SendAndReceive)
-            {
-                pjmedia_sdp_media_remove_all_attr(media, "sendrecv");
-            }
-            else if (ourStream->second->state == SendOnly)
-            {
-                pjmedia_sdp_media_remove_all_attr(media, "sendonly");
-            }
-            else if (ourStream->second->state == ReceiveOnly)
-            {
-                pjmedia_sdp_media_remove_all_attr(media, "recvonly");
-            }
-            else if (ourStream->second->state == Inactive)
-            {
-                pjmedia_sdp_media_remove_all_attr(media, "inactive");
-            }
+                // Depending on the current state go ahead and remove the current attribute
+                if (ourStream->second->state == SendAndReceive)
+                {
+                    pjmedia_sdp_media_remove_all_attr(media, "sendrecv");
+                }
+                else if (ourStream->second->state == SendOnly)
+                {
+                    pjmedia_sdp_media_remove_all_attr(media, "sendonly");
+                }
+                else if (ourStream->second->state == ReceiveOnly)
+                {
+                    pjmedia_sdp_media_remove_all_attr(media, "recvonly");
+                }
+                else if (ourStream->second->state == Inactive)
+                {
+                    pjmedia_sdp_media_remove_all_attr(media, "inactive");
+                }
 
-            // Now that the old attribute is removed we can go ahead and update our state
-            ourStream->second->state = stream->second;
+                // Now that the old attribute is removed we can go ahead and update our state
+                ourStream->second->state = stream->second;
 
-            // Now we can go ahead and add in the corret attribute
-            pjmedia_sdp_attr *attr = NULL;
+                // Now we can go ahead and add in the corret attribute
+                pjmedia_sdp_attr *attr = NULL;
                 
-            if (ourStream->second->state == SendAndReceive)
-            {
-                attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendrecv", NULL);
-            }
-            else if (ourStream->second->state == SendOnly)
-            {
-                attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendonly", NULL);
-            }
-            else if (ourStream->second->state == ReceiveOnly)
-            {
-                attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "recvonly", NULL);
-            }
-            else if (ourStream->second->state == Inactive)
-            {
-                attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "inactive", NULL);
+                if (ourStream->second->state == SendAndReceive)
+                {
+                    attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendrecv", NULL);
+                }
+                else if (ourStream->second->state == SendOnly)
+                {
+                    attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "sendonly", NULL);
+                }
+                else if (ourStream->second->state == ReceiveOnly)
+                {
+                    attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "recvonly", NULL);
+                }
+                else if (ourStream->second->state == Inactive)
+                {
+                    attr = pjmedia_sdp_attr_create(mImplPriv->mDialog->pool, "inactive", NULL);
+                }
+
+                if (attr)
+                {
+                    pjmedia_sdp_media_add_attr(media, attr);
+                    changed = true;
+                }
             }
 
-            if (attr)
+            // If any streams were actually updated trigger a reinvite if need be
+            if (changed == true)
             {
-                pjmedia_sdp_media_add_attr(media, attr);
-                changed = true;
+                // TODO: Add code here to determine if we really do need to send the reinvite, if we haven't actually answered yet
+                // what we really want to do is just update our answer SDP
+                pjsip_tx_data *packet = NULL;
+
+                if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, mImplPriv->mSDP, &packet)))
+                {
+                    pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+                }
+                else
+                {
+                    lg(Warning) << "Unable to create reinvite";
+                }
             }
-        }
 
-        // If any streams were actually updated trigger a reinvite if need be
-        if (changed == true)
+            operationCookie->setCompleted();
+        }
+        catch (const std::exception& e)
         {
-            // TODO: Add code here to determine if we really do need to send the reinvite, if we haven't actually answered yet
-            // what we really want to do is just update our answer SDP
-            pjsip_tx_data *packet = NULL;
-
-            if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, mImplPriv->mSDP, &packet)))
-            {
-                pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
-            }
-            else
-            {
-                lg(Warning) << "Unable to create reinvite";
-            }
+            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            assert(false);
         }
-
-        mCb->ice_response();
-   
         return Complete;
     }
     
@@ -708,52 +758,63 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        // Is this a retry for an operation we're already processing?
-        if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+        AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextResultData<StreamInformationDict, AMD_SessionController_addStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                mOperationContext,
+                mCb);
+        if (!operationCookie)
         {
-            lg(Debug) << "Retry of previously processed addStreams() operation detected and rejected.";
+            lg(Debug) << "addStreams() detected retry for operation " << mOperationContext->id;
             return Complete;
         }
 
-        lg(Debug) << "Executing an addStreams Operation";
-
-        // If there is an outstanding transaction then no streams can be added at this time
-        if (mImplPriv->mInviteSession->invite_tsx)
+        try
         {
-            mCb->ice_response(StreamInformationDict());
-            return Complete;
-        }
+            lg(Debug) << "Executing an addStreams Operation";
 
-        // Create an offer adding in the requested streams
-        StreamInformationDict added;
-        pjmedia_sdp_session *sdp = mSession->createSDPOffer(mStreams, added);
+            // If there is an outstanding transaction then no streams can be added at this time
+            if (mImplPriv->mInviteSession->invite_tsx)
+            {
+                mCb->ice_response(StreamInformationDict());
+                return Complete;
+            }
 
-        // If no streams were actually added respond back appropriately
-        if (added.empty())
-        {
-            mCb->ice_response(StreamInformationDict());
-            return Complete;
-        }
+            // Create an offer adding in the requested streams
+            StreamInformationDict added;
+            pjmedia_sdp_session *sdp = mSession->createSDPOffer(mStreams, added);
 
-        // Store callback information so when the remote party responds with which streams were accepted we can
-        // communicate it to the controller
-        mImplPriv->mAddStreamsCb = mCb;
+            // If no streams were actually added respond back appropriately
+            if (added.empty())
+            {
+                mCb->ice_response(StreamInformationDict());
+                return Complete;
+            }
+
+            // Store callback information so when the remote party responds with which streams were accepted we can
+            // communicate it to the controller
+            mImplPriv->mAddStreamsCb = mCb;
 
-        // Okay, create and send the reinvite!
-        pjsip_tx_data *packet = NULL;
+            // Okay, create and send the reinvite!
+            pjsip_tx_data *packet = NULL;
             
-        if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, sdp, &packet)))
-        {
-            pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+            if (success(pjsip_inv_reinvite(mImplPriv->mInviteSession, NULL, sdp, &packet)))
+            {
+                pjsip_inv_send_msg(mImplPriv->mInviteSession, packet);
+            }
+            else
+            {
+                // If we couldn't create the reinvite no streams were added
+                lg(Warning) << "Unable to create reinvite when adding streams";
+            }
+        
+            operationCookie->setResult(StreamInformationDict());
         }
-        else
+        catch (const std::exception& e)
         {
-            // If we couldn't create the reinvite no streams were added
-            lg(Warning) << "Unable to create reinvite when adding streams";
+            operationCookie->setException(ExceptionWrapperPtr(new ExceptionWrapper(e)));
+            assert(false);
         }
-        
-        mCb->ice_response(StreamInformationDict());
-
         return Complete;
     }
 
@@ -778,38 +839,50 @@ public:
 
     SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
     {
-        // Is this a retry for an operation we're already processing?
-        if (!mImplPriv->mOperationContextCache->addOperationContext(mOperationContext))
+        AMDContextData<AMD_SessionController_removeStreamsPtr>::ptr_type operationCookie = 
+            getContext<AMDContextData<AMD_SessionController_removeStreamsPtr> >(
+                mImplPriv->mOperationContextCache,
+                mOperationContext,
+                mCb);
+        if (!operationCookie)
         {
-            lg(Debug) << "Retry of previously processed removeStates() operation detected and rejected.";
+            lg(Debug) << "removeStreamStates() detected retry for operation " << mOperationContext->id;
             return Complete;
         }
 
-        lg(Debug) << "Executing a removeStreams Operation";
-
-        pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
-
-        // If there is an outstanding transaction just set this as the answer SDP, otherwise trigger a reinvite
-        if (!mImplPriv->mInviteSession->invite_tsx)
+        try
         {
-            pjsip_tx_data *packet = NULL;
+            lg(Debug) << "Executing a removeStreams Operation";
+
+            pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
... 1264 lines suppressed ...


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list