[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Mar 21 16:16:10 CDT 2012


branch "retry_deux" has been updated
       via  65f5738603391b1e4dbfc2c40077a95db6f093e8 (commit)
       via  3bc607b2923bb5e7e48f63b94b5e9d7151a98871 (commit)
       via  b01f9cee61a9676c440322c84e384301977c3872 (commit)
       via  7b89c082364c2015f0576d7b4165179aadcfa22d (commit)
       via  ec893489b857a218ca5c2315e6e0d4f7e926a02e (commit)
       via  75c04280f9f1c35bf3927dc83350870ddeee2395 (commit)
       via  d613fcbc05fdaa1f05adc1bb4e62a935bd0efa37 (commit)
       via  3d96dc7671b99f124b9cf984886039ba578962c7 (commit)
       via  cc7adc199845c220745606844030d45978a7b251 (commit)
       via  31946020d908b744f0969aae1704039e6b80b24a (commit)
       via  fff860e3c1b6e62994bfcc9eefbfe0c167316874 (commit)
       via  6309c8b41438836fe368dca74a455d6ce0cc1034 (commit)
       via  dc372a69a62dc2cff4be7fccd5f34e7d4e40b1f7 (commit)
       via  aeeaed86fcbc97a9492a60242870eef57a149f0e (commit)
       via  3c93c0a3ad074ed810e4fe3f17de7fb1f9306ce7 (commit)
       via  78ac17239d10105250ba51419264713897baedee (commit)
      from  c4096313bab94a3de1df64f8838acf36281ddbcc (commit)

Summary of changes:
 config/test_bridging.conf                          |    4 +
 .../BridgeService/BridgeReplicatorIf.ice           |   13 +-
 src/BridgeCreationExtensionPointImpl.cpp           |  113 +-
 src/BridgeCreationExtensionPointImpl.h             |    3 +-
 src/BridgeImpl.cpp                                 | 1724 +++++++++++++-------
 src/BridgeImpl.h                                   |    9 +-
 src/BridgeListenerMgr.cpp                          |   33 +-
 src/BridgeListenerMgr.h                            |   11 +
 src/BridgeManagerImpl.cpp                          |  703 ++++++---
 src/BridgeManagerImpl.h                            |    2 +-
 src/BridgeManagerListenerMgr.cpp                   |   23 +-
 src/BridgeManagerListenerMgr.h                     |    5 +
 src/BridgePartyIdExtensionPoint.cpp                |  271 +++-
 src/BridgePartyIdExtensionPoint.h                  |   31 +-
 src/BridgeReplicatorService.cpp                    |  210 ---
 src/BridgeReplicatorStateListenerI.cpp             |  122 +-
 src/BridgeReplicatorStateListenerI.h               |    5 +-
 src/BridgeServiceConfig.h                          |   27 +-
 src/CMakeLists.txt                                 |    8 +-
 src/Component.cpp                                  |   19 +-
 src/ComponentStateReplicator.cpp                   |  117 ++
 src/ExceptionWrapper.h                             |  110 ++
 src/MediaMixer.cpp                                 |   19 +-
 src/MediaSplicer.cpp                               |   42 +-
 src/OperationMonitor.cpp                           |   86 +
 src/OperationMonitor.h                             |  488 ++++++
 ...atorStateListenerI.h => ReplicatorSmartProxy.h} |   30 +-
 src/ServiceUtil.h                                  |    6 +-
 src/SessionCollection.cpp                          |   53 +-
 src/SessionCollection.h                            |   21 +-
 src/SessionListener.cpp                            |  579 ++++---
 src/SessionListener.h                              |   50 +-
 src/SessionOperations.cpp                          |   33 +-
 src/SessionOperations.h                            |   42 +-
 src/SessionWrapper.cpp                             |  121 +-
 src/SessionWrapper.h                               |   49 +-
 src/Tasks.h                                        |    5 +
 test/BridgeListenerI.cpp                           |   15 +-
 test/BridgeListenerI.h                             |   14 +-
 test/BridgeManagerListenerI.cpp                    |   12 +-
 test/BridgeManagerListenerI.h                      |    9 +-
 test/CMakeLists.txt                                |    7 +-
 test/TestBridging.cpp                              |  262 ++--
 test/UnitTests.cpp                                 |   69 +-
 44 files changed, 3929 insertions(+), 1646 deletions(-)
 mode change 100644 => 100755 src/BridgeManagerImpl.cpp
 delete mode 100644 src/BridgeReplicatorService.cpp
 create mode 100644 src/ComponentStateReplicator.cpp
 create mode 100755 src/ExceptionWrapper.h
 create mode 100755 src/OperationMonitor.cpp
 create mode 100755 src/OperationMonitor.h
 copy src/{BridgeReplicatorStateListenerI.h => ReplicatorSmartProxy.h} (55%)
 mode change 100644 => 100755
 mode change 100644 => 100755 src/SessionOperations.h
 mode change 100644 => 100755 src/SessionWrapper.cpp
 mode change 100644 => 100755 src/SessionWrapper.h


- Log -----------------------------------------------------------------
commit 65f5738603391b1e4dbfc2c40077a95db6f093e8
Merge: c409631 3bc607b
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Mar 21 18:08:35 2012 -0230

    Merge branch 'operation-context-propagation' into retry_deux


commit 3bc607b2923bb5e7e48f63b94b5e9d7151a98871
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Mar 21 17:57:52 2012 -0230

    A *lot* of the OperationContext changes.

diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 3dbe763..7f08258 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsExtensionPointsIf.ice>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
 #include <AsteriskSCF/Media/MediaIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
 
 module AsteriskSCF
 {
@@ -236,8 +237,8 @@ class BridgeListenerStateItem extends ReplicatedStateItem
  **/
 interface ReplicatorListener
 {
-    void stateRemoved(Ice::StringSeq itemKeys);
-    void stateSet(ReplicatedStateItemSeq items);
+    void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+    void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
 };
 
 /**
@@ -248,11 +249,11 @@ interface ReplicatorListener
  **/
 interface Replicator
 {
-    void addListener(ReplicatorListener* listener);
-    void removeListener(ReplicatorListener* listener);
+    void addListener(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatorListener* listener);
+    void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatorListener* listener);
 
-    void setState(ReplicatedStateItemSeq items);
-    void removeState(Ice::StringSeq items);
+    void setState(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
+    void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
 
     idempotent ReplicatedStateItemSeq getState(Ice::StringSeq itemKeys);
     idempotent ReplicatedStateItemSeq getAllState();
diff --git a/src/BridgeCreationExtensionPointImpl.cpp b/src/BridgeCreationExtensionPointImpl.cpp
index 172321f..73d0367 100755
--- a/src/BridgeCreationExtensionPointImpl.cpp
+++ b/src/BridgeCreationExtensionPointImpl.cpp
@@ -17,6 +17,9 @@
 #include "BridgeCreationExtensionPointImpl.h"
 #include <boost/thread/shared_mutex.hpp>
 #include "BridgeServiceConfig.h"
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include "OperationMonitor.h"
 
 using namespace AsteriskSCF::BridgeService;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -30,46 +33,97 @@ namespace
     {
     public:
         BridgeExtPtImpl(const Logger& logger) :
-            mLogger(logger)
+            mLogger(logger),
+            mContextCache(AsteriskSCF::Operations::OperationContextCache::create(60)) /* XXX make configurable */
         {
         }
 
-        void addHook(const BridgeCreationHookPrx& newHook, const Ice::Current& current)
+        void addHook(const AsteriskSCF::System::V1::OperationContextPtr& context, const 
+            BridgeCreationHookPrx& newHook, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            BridgeCreationHookSeq::iterator i = 
-                find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
-            if (i != mHooks.end())
-            {
-                mLogger(Trace) << "refreshing creation hook " << newHook << " on " 
-                    << objectIdFromCurrent(current);
-                //
-                // Refresh the proxy if it is already in the vector.
-                // 
-                *i = newHook;
-            }
-            else
+            ContextDataPtr data(checkAndThrow(mContextCache, context));
+            if (data)
             {
-                mLogger(Trace) << "adding creation hook " << newHook << " on " 
-                    << objectIdFromCurrent(current);
-                mHooks.push_back(newHook);
+                try
+                {
+                    UniqueLock lock(mLock);
+                    BridgeCreationHookSeq::iterator i = 
+                        find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
+                    if (i != mHooks.end())
+                    {
+                        mLogger(Trace) << "refreshing creation hook " << newHook << " on " 
+                                       << objectIdFromCurrent(current);
+                        //
+                        // Refresh the proxy if it is already in the vector.
+                        // 
+                        *i = newHook;
+                    }
+                    else
+                    {
+                        mLogger(Trace) << "adding creation hook " << newHook << " on " 
+                                       << objectIdFromCurrent(current);
+                        mHooks.push_back(newHook);
+                    }
+                    data->getMonitor()->setCompleted();
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (const std::exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (...)
+                {
+                    data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+                    throw;
+                }
             }
         }
 
-        void removeHook(const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
+        void removeHook(const AsteriskSCF::System::V1::OperationContextPtr& context, 
+            const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            mLogger(Trace) << "removing creation hook " << hookToRemove << " on " 
-                << objectIdFromCurrent(current);
-            mHooks.erase(remove_if(mHooks.begin(), mHooks.end(), 
-                    IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+            ContextDataPtr data(checkAndThrow(mContextCache, context));
+            if (data)
+            {
+                try
+                {
+                    UniqueLock lock(mLock);
+                    mLogger(Trace) << "removing creation hook " << hookToRemove << " on " 
+                                   << objectIdFromCurrent(current);
+                    mHooks.erase(remove_if(mHooks.begin(), mHooks.end(), 
+                            IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (const std::exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (...)
+                {
+                    data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+                    throw;
+                }
+            }
         }
 
-        void clearHooks(const Ice::Current& current)
+        void clearHooks(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
-            mHooks.clear();
+            if (mContextCache->addOperationContext(context))
+            {
+                UniqueLock lock(mLock);
+                mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
+                mHooks.clear();
+            }
         }
 
         BridgeCreationHookDataPtr runHooks(const BridgeCreationHookDataPtr& originalData)
@@ -83,7 +137,7 @@ namespace
                 try
                 {
                     BridgeCreationHookDataPtr resultData; 
-                    HookResult result = (*i)->execute(tokenData, resultData);
+                    HookResult result = (*i)->execute(AsteriskSCF::Operations::createContext(), tokenData, resultData);
                     if (result.status == AsteriskSCF::System::Hook::V1::Succeeded)
                     {
                         tokenData = resultData;
@@ -110,6 +164,7 @@ namespace
 
         Logger mLogger;
         BridgeCreationHookSeq mHooks;
+        AsteriskSCF::Operations::OperationContextCachePtr mContextCache;
 
         void removeHooks(const BridgeCreationHookSeq& hooks)
         {
diff --git a/src/BridgeCreationExtensionPointImpl.h b/src/BridgeCreationExtensionPointImpl.h
index 5cc6c6a..b3b2cfc 100755
--- a/src/BridgeCreationExtensionPointImpl.h
+++ b/src/BridgeCreationExtensionPointImpl.h
@@ -38,7 +38,8 @@ namespace BridgeService
     /**
      * Base class for internal methods.
      */
-    class BridgeCreationExtensionPointImpl : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
+    class BridgeCreationExtensionPointImpl :
+        public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
     {
     public:
 
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index b885f84..ccb2454 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -30,8 +30,9 @@
 #include "SessionWrapper.h"
 #include "SessionOperations.h"
 #include "SessionListener.h"
-#include <AsteriskSCF/Helpers/OperationContextCache.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include "OperationMonitor.h"
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -44,37 +45,29 @@ using namespace std;
 
 /**
  *
- * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
- * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
- * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
- * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
- * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
- * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
- * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
- * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
- * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
- * refactoring for replication could have occurred in that development branch. However, replication is a separate task
- * and it is desirable to have a *relevant* review of it independently of asynchronous support.
+ * NOTE: Once asynchronous invocations began to be added to the mix, several
+ * shortcomings of both the initial implementation of the bridge and its
+ * replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions.
+ * When converting these operations to AMD/AMI, the serial approach completely
+ * falls apart. Especially when replication is thrown into the mix. For
+ * example, in a typical AMD/AMI scenario, the setBridge() and media allocation
+ * operations would occur as AMI requests. The apparent outcome will be that
+ * the participants will "appear" on the call in the order in which the
+ * operations complete (i.e. randomly). Indeed it is possible that some will
+ * ultimately fail and some sessions might actually be removed from the bridge
+ * before they are able to complete. Reconciling the completely asynchronous
+ * nature to an implementation is just too awful. As asynchronous support in
+ * bridging is the driving factor behind these changes, the refactoring for
+ * replication could have occurred in that development branch. However,
+ * replication is a separate task and it is desirable to have a *relevant*
+ * review of it independently of asynchronous support.
  *
  */
 
-namespace Ice
-{
-typedef IceUtil::Handle<AMDCallback> AMDCallbackPtr;
-}
-
 namespace
 {
 
-class GenericAMDCallbackIf : virtual public AsteriskSCF::BridgeService::QueuedTask
-{
-public:
-    virtual ~GenericAMDCallbackIf() {}
-    virtual Ice::AMDCallbackPtr setCallback(const Ice::AMDCallbackPtr& cb) = 0;
-};
-
-typedef IceUtil::Handle<GenericAMDCallbackIf> GenericAMDCallbackIfPtr;
-
 class SessionsTracker : public IceUtil::Shared
 {
 public:
@@ -132,71 +125,6 @@ private:
 typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
 
 
-template <class T>
-class GenericAMDCallback : public GenericAMDCallbackIf
-{
-public:
-    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
-        QueuedTask("GenericAMDCallback"),
-        mCallback(cb),
-        mTracker(tracker)
-    {
-    }
-
-    Ice::AMDCallbackPtr setCallback(const Ice::AMDCallbackPtr& cb)
-    {
-        T updatedCallback = T::dynamicCast(cb);
-        assert(updatedCallback);
-        if (updatedCallback)
-        {
-            boost::lock_guard<boost::mutex> lock(mMutex);
-            if (!mCallback)
-            {
-                //
-                // Operation has already been completed.
-                //
-                return 0;
-            }
-            Ice::AMDCallbackPtr t = Ice::AMDCallbackPtr::dynamicCast(mCallback);
-            assert(t);
-            mCallback = T::dynamicCast(cb);
-            assert(mCallback);
-            return t;
-        }
-        return 0;
-    }
-   
-protected:
-
-    bool executeImpl()
-    {
-        boost::lock_guard<boost::mutex> lock(mMutex);
-        mCallback->ice_response();
-        mCallback = 0;
-        return true;
-    }
-
-    void failImpl()
-    {
-        boost::lock_guard<boost::mutex> lock(mMutex);
-        SessionErrorSeq errors(mTracker->getExceptions());
-        if (!errors.empty())
-        {
-            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
-        }
-        else
-        {
-            mCallback->ice_exception();
-        }
-        mCallback = 0;
-    }
-
-private:
-    boost::mutex mMutex;
-    T mCallback;
-    SessionsTrackerPtr mTracker;
-};
-
 /**
  * 
  * BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
@@ -228,8 +156,8 @@ public:
         const Ice::Current&);
 
     SessionSeq listSessions(const Ice::Current&);
-    void shutdown(const Ice::Current& current);
-    void destroy(const Ice::Current& current);
+    void shutdown(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
+    void destroy(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
 
     void addListener(const AsteriskSCF::System::V1::OperationContextPtr&,
         const BridgeListenerPrx& listener, const Ice::Current& current);
@@ -271,15 +199,21 @@ public:
 
     void forceUpdate();
 
-    void getAddSessionsTasks(QueuedTasks& tasks, const SessionPrx& source, const SessionSeq& sessions, const CallerPtr& callerID, const RedirectionsPtr& redirects);
+    void getAddSessionsTasks(QueuedTasks& tasks, 
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const SessionPrx& source, const SessionSeq& sessions,
+        const CallerPtr& callerID, const RedirectionsPtr& redirects);
 
-    PartyIdHooksPtr getPartyIdHooks()
+    PartyIdHooksPtr getPartyIdHooks() const
     {
         return mState->partyIdHookSet;
     }
 
     BridgeCookies getCookies()
     {
+        //
+        // TODO: use transform?
+        //
         BridgeCookies result;
         for (BridgeCookieDict::const_iterator i = mState->cookies.begin(); i != mState->cookies.end(); ++i)
         {
@@ -288,8 +222,12 @@ public:
         return result;
     }
 
-    void updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine);
-    void updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections);
+    void updateConnectedLine(const SessionWrapperPtr& sourceSession, 
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const ConnectedLinePtr& connectedLine);
+    void updateRedirections(const SessionWrapperPtr& sourceSession, 
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const RedirectionsPtr& redirections);
 
 private:
 
@@ -332,7 +270,7 @@ private:
     //
     Logger mLogger;
 
-    AsteriskSCF::Helpers::OperationContextCachePtr mOperationContextCache;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
 
     void statePreCheck();
     BridgeStateItemPtr createUpdate();
@@ -348,7 +286,7 @@ private:
 typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 
 //
-// simply checks for nulls at this point.
+// Simply checks for nulls at this point.
 //
 static void checkSessions(const SessionSeq& sessions)
 {
@@ -381,11 +319,14 @@ static void checkSessions(const SessionSeq& sessions)
 class ForwardRedirectionsUpdatedTask : public QueuedTask
 {
 public:
-    ForwardRedirectionsUpdatedTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const RedirectionsPtr& redirections,
-            const Logger& logger) :
+    ForwardRedirectionsUpdatedTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const RedirectionsPtr& redirections,
+        const Logger& logger) :
         QueuedTask("ForwardRedirectionsUpdatedTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mRedirections(redirections),
@@ -404,17 +345,25 @@ protected:
         }
         try
         {
-
+            //
             // Forward the ConnectedLine to each bridged session.
+            //
             SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
             for(SessionSeq::iterator i = sessions.begin();
                 i != sessions.end(); ++i)
             {
+                //
+                // TODO: This looks like a SessionOperation.
+                //
                 if (!(*i) || (*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
                 {
                     continue;
                 }
 
+                //
+                // TODO: Why doesn't this loop simply use the sequence of SessionWrappers you
+                // can retrieve from the SessionCollection?
+                //
                 SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
                 if (destSessionWrapper)
                 {
@@ -452,7 +401,8 @@ protected:
                 try
                 {
                     // Apply this hook. 
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(mSourceSession,
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(
+                        AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
                         destinationSession->getSession(),
                         currentRedirections, destSpecificRedirections);
 
@@ -472,11 +422,12 @@ protected:
         if (sessionController)
         {
             // Forward the info via the SessionController for this session.
-            sessionController->updateRedirections(AsteriskSCF::createContext(), currentRedirections);
+            sessionController->updateRedirections(AsteriskSCF::Operations::createContext(mRootContext), currentRedirections);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     RedirectionsPtr mRedirections;
@@ -498,10 +449,13 @@ private:
 class ForwardConnectedLineTask : public QueuedTask
 {
 public:
-    ForwardConnectedLineTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const Logger& logger) :
+    ForwardConnectedLineTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const Logger& logger) :
         QueuedTask("ForwardConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mLogger(logger)
@@ -529,7 +483,10 @@ protected:
                 return true; 
             }
 
+            //
             // Forward the ConnectedLine to each bridged session.
+            // TODO: Implement in terms of an operation to be used with the visitor.
+            //
             SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
             for(SessionSeq::iterator i = sessions.begin();
                 i != sessions.end(); ++i)
@@ -577,7 +534,9 @@ protected:
                 try
                 {
                     // Apply a hook
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(mSourceSession,
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(
+                        AsteriskSCF::Operations::createContext(mRootContext),
+                        mSourceSession,
                         destinationSession->getSession(),
                         currentConnectedLine, destSpecificConnectedLine);
 
@@ -597,11 +556,12 @@ protected:
         if (sessionController)
         {
             // Forward the info via the SessionController for this session.
-            sessionController->updateConnectedLine(AsteriskSCF::createContext(), currentConnectedLine);
+            sessionController->updateConnectedLine(AsteriskSCF::Operations::createContext(mRootContext), currentConnectedLine);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     Logger mLogger;
@@ -610,11 +570,14 @@ private:
 class ForwardCallerIDTask : public QueuedTask
 {
 public:
-    ForwardCallerIDTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& source,
-            const CallerPtr& callerID,
-            const Logger& logger) :
+    ForwardCallerIDTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& source,
+        const CallerPtr& callerID,
+        const Logger& logger) :
         QueuedTask("ForwardConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSource(source),
         mCallerID(callerID),
@@ -683,7 +646,9 @@ protected:
                 try
                 {
                     // Apply a hook
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(mSource,
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(
+                        AsteriskSCF::Operations::createContext(mRootContext), 
+                        mSource,
                         destinationSession->getSession(),
                         currentCallerID, destSpecificCallerID);
 
@@ -704,11 +669,12 @@ protected:
         SessionControllerPrx sessionController =  destinationSession->getSessionController();
         if (sessionController)
         {
-            sessionController->updateCallerID(AsteriskSCF::createContext(), currentCallerID);
+            sessionController->updateCallerID(AsteriskSCF::Operations::createContext(mRootContext), currentCallerID);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSource;
     CallerPtr mCallerID;
@@ -722,11 +688,14 @@ private:
 class UpdateConnectedLineTask : public QueuedTask
 {
 public:
-    UpdateConnectedLineTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const ConnectedLinePtr& connectedLine,
-            const Logger& logger) :
+    UpdateConnectedLineTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const ConnectedLinePtr& connectedLine,
+        const Logger& logger) :
         QueuedTask("UpdateConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mConnectedLine(connectedLine),
@@ -758,13 +727,27 @@ protected:
         if (partyIdHooks)
         {
             // Allow the ReceivedConnectedLinePartyId hooks to alter the ConnectedLine record. 
-            for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->receivedConnectedLineHooks.begin();
+            for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i =
+                    partyIdHooks->receivedConnectedLineHooks.begin();
                 i != partyIdHooks->receivedConnectedLineHooks.end(); ++i)
             {
                 try
                 {
-                    // Apply this hook. 
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(mSourceSession,
+                    //
+                    // It is quite possible that this can never
+                    // happen, but it is prudent to check all the
+                    // same.
+                    //
+                    if ((*i) == 0)
+                    {
+                        continue;
+                    }
+                    
+                    //
+                    // Apply this hook.
+                    //
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(
+                        AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
                         currentConnectedLine, updatedConnectedLine);
 
                     if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
@@ -780,7 +763,9 @@ protected:
         }
 
         //
-        // Cache this value.
+        // Cache this value.  TODO: can currentConnectedLine be 0 and
+        // if so, does it make sense to set it. The way
+        // setConnectedLine is written, it won't be set again.
         //
         wrapper->setConnectedLine(currentConnectedLine);
 
@@ -788,6 +773,7 @@ protected:
     }
            
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     ConnectedLinePtr mConnectedLine;
@@ -797,9 +783,12 @@ private:
 class RemoveSessionsNotify : public QueuedTask
 {
 public:
-    RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
-            const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
+    RemoveSessionsNotify(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeListenerMgrPtr& bridgeListeners,
+        const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
         QueuedTask("RemoveSessionsNotify"),
+        mRootContext(rootContext),
         mBridgeListeners(bridgeListeners),
         mTracker(tracker),
         mCookies(cookies)
@@ -812,12 +801,13 @@ protected:
         SessionSeq sessions = mTracker->getSessions();
         if (!sessions.empty())
         {
-            mBridgeListeners->sessionsRemoved(sessions, mCookies);
+            mBridgeListeners->sessionsRemoved(mRootContext, sessions, mCookies);
         }
         return true;
     }
            
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeListenerMgrPtr mBridgeListeners;
     SessionsTrackerPtr mTracker;
     BridgeCookies mCookies;
@@ -826,9 +816,11 @@ private:
 class SetBridgeTask : public QueuedTask
 {
 public:
-    SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+    SetBridgeTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext, 
+            const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
             const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
         QueuedTask("SetBridgeTask"),
+        mRootContext(rootContext),
         mSessionManager(sessionCollection),
         mBridge(bridge),
         mSessionListener(listener),
@@ -853,7 +845,7 @@ protected:
                 continue;
             }
             tasksDispatched = true;
-            (*i)->begin_setBridge(mBridge, mSessionListener,
+            (*i)->begin_setBridge(AsteriskSCF::Operations::createContext(mRootContext), mBridge, mSessionListener,
                     newCallback_Session_setBridge(this, &SetBridgeTask::set,
                             &SetBridgeTask::failed), session);
         }
@@ -912,6 +904,7 @@ protected:
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     SessionCollectionPtr mSessionManager;
     BridgePrx mBridge;
     SessionListenerPrx mSessionListener;
@@ -919,6 +912,15 @@ private:
     SessionsTrackerPtr mTracker;
 };
 
+typedef AMDContextResultData<
+    AsteriskSCF::Media::V1::StreamInformationDict,
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr> AddStreamsContextData;
+typedef boost::shared_ptr<AddStreamsContextData> AddStreamsContextDataPtr;
+
+typedef AMDContextData<
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr> RemoveStreamsContextData;
+typedef boost::shared_ptr<RemoveStreamsContextData> RemoveStreamsContextDataPtr;
+
 class BridgeSessionController : public SessionController
 {
 public:
@@ -927,8 +929,7 @@ public:
         mBridge(bridge), 
         mSelf(self), 
         mLogger(logger),
-        mOperationCache(new AsteriskSCF::Helpers::OperationContextCache(
-                AsteriskSCF::Helpers::OperationCacheDefaultTTL))
+        mOperationCache(AsteriskSCF::Operations::OperationContextCache::create(60))
     { 
     }
 
@@ -945,46 +946,94 @@ public:
         const AsteriskSCF::System::V1::OperationContextPtr& context,
         const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
-        if (!mOperationCache->addOperationContext(context))
+        try
         {
-            //
-            // XXXX there needs to be away to give a good response here!!!
-            //
-            cb->ice_response(AsteriskSCF::Media::V1::StreamInformationDict());
-            return;
+            AddStreamsContextDataPtr data(
+                getContext<AddStreamsContextDataPtr,
+                AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr>(mOperationCache,
+                    context, cb));
+            if (data)
+            {
+                try
+                {
+                    AddStreamsOperationPtr op = new AddStreamsOperation(data->getProxy(), mSelf, streams, mLogger);
+                    mBridge->getSessions()->visitSessions(*op);
+                }
+                catch (const std::exception& ex)
+                {
+                    data->getProxy()->ice_exception(ex);
+                }
+                catch (...)
+                {
+                    data->getProxy()->ice_exception();
+                }
+            }
+        }
+        catch (const std::exception& ex)
+        {
+            cb->ice_exception(ex);
+        }
+        catch (...)
+        {
+            cb->ice_exception();
         }
-        AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
-        mBridge->getSessions()->visitSessions(*op);
     }
 
     void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
         const AsteriskSCF::System::V1::OperationContextPtr& context,
         const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
-        if (!mOperationCache->addOperationContext(context))
+        try
         {
-            //
-            // Silently swallow and ignore the duplicate request.
-            //
-            cb->ice_response();
-            return;
+            RemoveStreamsContextDataPtr data(
+                getContext<RemoveStreamsContextDataPtr,
+                AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr>(
+                    mOperationCache, context, cb));
+
+            if (data)
+            {
+                RemoveStreamsOperation op(mSelf, streams);
+                mBridge->getSessions()->visitSessions(op);
+                data->getProxy()->ice_response();
+            }
+        }
+        catch (const std::exception& ex)
+        {
+            cb->ice_exception(ex);
+        }
+        catch (...)
+        {
+            cb->ice_exception();
         }
-        RemoveStreamsOperation op(mSelf, streams);
-        mBridge->getSessions()->visitSessions(op);
-        cb->ice_response();
     }
 
     void updateConnectedLine(const AsteriskSCF::System::V1::OperationContextPtr& context,
         const ConnectedLinePtr& connectedLine, const Ice::Current&)
     {
-        if (!mOperationCache->addOperationContext(context))
+        ContextDataPtr data(checkAndThrow(mOperationCache, context));
+        if (data)
         {
-            //
-            // Silently swallow and ignore the duplicate request.
-            //
-            return;
+            try
+            {
+                mBridge->updateConnectedLine(mSelf, context, connectedLine);
+                data->getMonitor()->setCompleted();
+            }
+            catch (const Ice::Exception& ex)
+            {
+                data->setException(ExceptionWrapper::create(ex));
+                throw;
+            }
+            catch (const std::exception& ex)
+            {
+                data->setException(ExceptionWrapper::create(ex));
+                throw;
+            }
+            catch (...)
+            {
+                data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+                throw;
+            }
         }
-        mBridge->updateConnectedLine(mSelf, connectedLine);
     }
 
     void updateCallerID(const AsteriskSCF::System::V1::OperationContextPtr&, const CallerPtr&, const Ice::Current&)
@@ -999,32 +1048,51 @@ public:
     void updateRedirections(const AsteriskSCF::System::V1::OperationContextPtr& context,
         const RedirectionsPtr& redirections, const ::Ice::Current&) 
     {
-        if (!mOperationCache->addOperationContext(context))
+        ContextDataPtr data(checkAndThrow(mOperationCache, context));
+        if (data)
         {
-            //
-            // Silently swallow and ignore the duplicate request.
-            //
-            return;
+            try
+            {
+                mBridge->updateRedirections(mSelf, context, redirections);
+                data->getMonitor()->setCompleted();
+            }
+            catch (const Ice::Exception& ex)
+            {
+                data->setException(ExceptionWrapper::create(ex));
+                throw;
+            }
+            catch (const std::exception& ex)
+            {
+                data->setException(ExceptionWrapper::create(ex));
+                throw;
+            }
+            catch (...)
+            {
+                data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+                throw;
+            }
         }
-        mBridge->updateRedirections(mSelf, redirections);
     }
 
 private:
     BridgeImplPtr mBridge;
     SessionWrapperPtr mSelf;
     Logger mLogger;
-    AsteriskSCF::Helpers::OperationContextCachePtr mOperationCache;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationCache;
 };
 
 class SetAndGetSessionControllerTask : public QueuedTask
 {
 public:
-    SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
-                                   const SessionCollectionPtr& sessionCollection,
-                                   const SessionSeq& sessions,
-                                   const BridgeImplPtr& bridge,
-                                   const Logger& logger):
+    SetAndGetSessionControllerTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const Ice::ObjectAdapterPtr& adapter,
+        const SessionCollectionPtr& sessionCollection,
+        const SessionSeq& sessions,
+        const BridgeImplPtr& bridge,
+        const Logger& logger):
         QueuedTask("SetAndGetSessionControllerTask"),
+        mRootContext(rootContext),
         mAdapter(adapter),
         mSessionManager(sessionCollection),
         mSessions(sessions),
@@ -1054,9 +1122,12 @@ protected:
 	        std::string identity = (*i)->ice_getIdentity().name;
 	        identity += ".bridgecontroller";
 	        SessionControllerPrx controllerPrx =
-		    SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
-	        (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
-                                                   &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+                    SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+
+                (*i)->begin_setAndGetSessionController(AsteriskSCF::Operations::createContext(mRootContext), 
+                    controllerPrx, 
+                    newCallback_Session_setAndGetSessionController(this, &SetAndGetSessionControllerTask::get, 
+                    &SetAndGetSessionControllerTask::failed), session);
         }
         return !tasksDispatched;
     }
@@ -1107,6 +1178,7 @@ protected:
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     Ice::ObjectAdapterPtr mAdapter;
     SessionCollectionPtr mSessionManager;
     SessionSeq mSessions;
@@ -1118,10 +1190,13 @@ private:
 class RemoveSessionControllerTask : public QueuedTask
 {
 public:
-    RemoveSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+    RemoveSessionControllerTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const Ice::ObjectAdapterPtr& adapter,
 	const SessionsTrackerPtr& sessions,
 	const Logger& logger):
         QueuedTask("RemoveSessionControllerTask"),
+        mRootContext(rootContext),
         mAdapter(adapter),
         mSessions(sessions),
         mLogger(logger)
@@ -1150,12 +1225,13 @@ protected:
             // Remove the session controller from the session itself
             SessionControllerPrx controller = SessionControllerPrx::uncheckedCast(mAdapter->createProxy(
                                                                                       mAdapter->getCommunicator()->stringToIdentity(identity)));
-            (*i)->removeSessionController(controller);
+            (*i)->removeSessionController(AsteriskSCF::Operations::createContext(mRootContext), controller);
         }
         return true;
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     Ice::ObjectAdapterPtr mAdapter;
     SessionsTrackerPtr mSessions;
     Logger mLogger;
@@ -1164,11 +1240,13 @@ private:
 class UnplugMedia : public QueuedTask
 {
 public:
-    UnplugMedia(const BridgeImplPtr& bridge) :
+    UnplugMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("UnplugMedia"),
+        mRootContext(rootContext),
         mBridge(bridge)
-        {
-        }
+    {
+    }
 
 protected:
     bool executeImpl()
@@ -1179,18 +1257,21 @@ protected:
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class SetupMedia : public QueuedTask
 {
 public:
-    SetupMedia(const BridgeImplPtr& bridge) :
+    SetupMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("SetupMedia"),
+        mRootContext(rootContext),
         mBridge(bridge)
-        {
-        }
-
+    {
+    }
+    
 protected:
     bool executeImpl()
     {
@@ -1200,15 +1281,19 @@ protected:
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class AddToListeners : public QueuedTask
 {
 public:
-    AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
+    AddToListeners(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
         const BridgeCookies& cookies) :
         QueuedTask("AddToListeners"),
+        mRootContext(rootContext),
         mListeners(listeners),
         mTracker(tracker),
         mCookies(cookies)
@@ -1223,6 +1308,7 @@ protected:
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeListenerMgrPtr mListeners;
     SessionsTrackerPtr mTracker;
     BridgeCookies mCookies;
@@ -1243,8 +1329,9 @@ protected:
     {
         if (mBridge->getSessions()->size() < 2 && mPrx)
         {
-            mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
-                            &CheckShutdown::failed));
+            mPrx->begin_shutdown(AsteriskSCF::Operations::createContext(),
+                newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+                    &CheckShutdown::failed));
         }
         //
         // We don't care about the result really. The CheckShutdown instance will hang
@@ -1277,8 +1364,10 @@ private:
 class UpdateTask : public QueuedTask
 {
 public:
-    UpdateTask(const BridgeImplPtr& bridge) :
+    UpdateTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("UpdateTask"),
+        mRootContext(rootContext),
         mBridge(bridge)
     {
     }
@@ -1290,16 +1379,20 @@ protected:
         return true;
     }
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class ConnectTelephonyEventsTask: public QueuedTask
 {
 public:
-    ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
-                               const SessionSeq& newSessions,
-                               const Logger& logger) 
-       : QueuedTask("ConnectTelephonyEventsTask"), 
+    ConnectTelephonyEventsTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge,
+        const SessionSeq& newSessions,
+        const Logger& logger) 
+       : QueuedTask("ConnectTelephonyEventsTask"),
+         mRootContext(rootContext),
          mBridge(bridge), 
          mNewSessions(newSessions), 
          mLogger(logger) 
@@ -1341,6 +1434,7 @@ protected:
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionSeq mNewSessions;
     Logger mLogger;
@@ -1349,10 +1443,13 @@ private:
 class DisconnectTelephonyEventsTask: public QueuedTask
 {
 public:
-    DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
-                               const SessionSeq& disconnectingSessions,
-                               const Logger& logger) 
-       : QueuedTask("DisconnectTelephonyEventsTask"), 
+    DisconnectTelephonyEventsTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge,
+        const SessionSeq& disconnectingSessions,
+        const Logger& logger) 
+       : QueuedTask("DisconnectTelephonyEventsTask"),
+         mRootContext(rootContext),
          mBridge(bridge), 
          mDisconnectingSessions(disconnectingSessions), 
          mLogger(logger) 
@@ -1384,9 +1481,11 @@ protected:
         }
 
         /**
-         * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
-         * and just use a visitor. But when a session is removed, AMI operations are involved. We use an 
-         * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe. 
+         * In ConnectTelephonyEventsTask, we could assume all the sessions were
+         * in the bridge's session collection and just use a visitor. But when a
+         * session is removed, AMI operations are involved. We use an extra pass
+         * over the set of sessions being removed, and disconnect each one from
+         * the other, to be safe.
          */
         disconnectMembers();
 
@@ -1451,17 +1550,54 @@ protected:
         for(TelephonyEventSourceSeq::iterator i=fromSources.begin();   
             i != fromSources.end(); ++i)
         {
-            (*i)->removeSinks(AsteriskSCF::createContext(), sinksToRemove);
+            (*i)->removeSinks(AsteriskSCF::Operations::createContext(mRootContext), sinksToRemove);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionSeq mDisconnectingSessions;
     Logger mLogger;
 };
 
 
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+        QueuedTask("GenericAMDCallback"),
+        mCallback(cb),
+        mTracker(tracker)
+    {
+    }
+protected:
+
+    bool executeImpl()
+    {
+        mCallback->ice_response();
+        return true;
+    }
+
+    void failImpl()
+    {
+        SessionErrorSeq errors(mTracker->getExceptions());
+        if (!errors.empty())
+        {
+            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+        }
+        else
+        {
+            mCallback->ice_exception();
+        }
+    }
+
+private:
+    T mCallback;
+    SessionsTrackerPtr mTracker;
+};
+
 } // End of anonymous namespace
 
 BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
@@ -1479,7 +1615,7 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
     mReplicator(replicator),
     mSessionListener(createSessionListener(mSessions, logger)),
     mLogger(logger),
-    mOperationContextCache(new AsteriskSCF::Helpers::OperationContextCache(AsteriskSCF::Helpers::OperationCacheDefaultTTL))
+    mOperationContextCache(AsteriskSCF::Operations::OperationContextCache::create(60))
 {
     mLogger(Trace) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
     for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin(); 
@@ -1501,7 +1637,8 @@ BridgeImpl::~BridgeImpl()
 /** 
  * Process an updated ConnectedLine record from a session. 
  */
-void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine)
+void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, 
+    const AsteriskSCF::System::V1::OperationContextPtr& context, const ConnectedLinePtr& connectedLine)
 {
      try
     {
@@ -1509,12 +1646,12 @@ void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, con
 
         // Updates the cached ConnectedLine party id information for the given session.
         //  - Applies receive hooks on the received ConnectedLine info before caching.
-        tasks.push_back(new UpdateConnectedLineTask(this, sourceSession->getSession(), connectedLine, mLogger));
+        tasks.push_back(new UpdateConnectedLineTask(context, this, sourceSession->getSession(), connectedLine, mLogger));
 
 
         // Forwards the ConnectedLine information to the other sessions in the bridge.
         //  - Applies forwarding hooks to the cached ConnectedLine info before forwarding.
-        tasks.push_back(new ForwardConnectedLineTask(this, sourceSession->getSession(), mLogger));
+        tasks.push_back(new ForwardConnectedLineTask(context, this, sourceSession->getSession(), mLogger));
 
         ExecutorPtr runner(new Executor(tasks, mLogger));
         runner->start();
@@ -1528,7 +1665,8 @@ void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, con
 /** 
  * Process an updated Redirections record from a session. 
  */
-void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections)
+void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, 
+    const AsteriskSCF::System::V1::OperationContextPtr& context, const RedirectionsPtr& redirections)
 {
     try
     {
@@ -1536,7 +1674,7 @@ void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, cons
 
         // Forwards the Redirections information to the other sessions in the bridge.
         // Applies forwarding hooks to the Redirecrting info before forwarding.
-        tasks.push_back(new ForwardRedirectionsUpdatedTask(this, sourceSession->getSession(), redirections, mLogger));
+        tasks.push_back(new ForwardRedirectionsUpdatedTask(context, this, sourceSession->getSession(), redirections, mLogger));
         ExecutorPtr runner(new Executor(tasks, mLogger));
         runner->start();
     }
@@ -1551,6 +1689,9 @@ static SessionPrx extractSession(const SessionWithSessionInfo& swsi)
     return swsi.sessionProxy;
 }
 
+typedef AMDContextData<AMD_Bridge_addSessionsPtr> AddSessionsContextData;
+typedef boost::shared_ptr<AddSessionsContextData> AddSessionsContextDataPtr;
+
 void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
     const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
     const SessionWithSessionInfoSeq& sessionInfos,
@@ -1558,65 +1699,86 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
 {
     try
     {
-        if (!mOperationContextCache->addOperationContext(operationContext))
-        {
-            callback->ice_response();
-            return;
-        }
+        AddSessionsContextDataPtr data(
+            getContext<AddSessionsContextDataPtr, AMD_Bridge_addSessionsPtr>(
+                mOperationContextCache, operationContext, callback));
 
-        mSessions->reap();
-        SessionSeq sessions;
-        sessions.resize(sessionInfos.size());
-        std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
-        if (sessions.empty())
+        if (data)
         {
-            if (callback)
+            try
             {
-                callback->ice_response();
-            }
-            return;
-        }
-        checkSessions(sessions);
-        statePreCheck();
-        mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
-        SessionsTrackerPtr tracker(new SessionsTracker);
-        
-        QueuedTasks tasks;
-        tasks.push_back(new UnplugMedia(this));
-        tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
-        tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
-        tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
+                mSessions->reap();
+                SessionSeq sessions;
+                sessions.resize(sessionInfos.size());
+                std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
+                if (sessions.empty())
+                {
+                    if (callback)
+                    {
+                        data->getProxy()->ice_response();
+                    }
+                    return;
+                }
+                checkSessions(sessions);
+                statePreCheck();
+                mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
+                SessionsTrackerPtr tracker(new SessionsTracker);
+            
+                QueuedTasks tasks;
+                tasks.push_back(new UnplugMedia(operationContext, this));
+                tasks.push_back(new SetBridgeTask(operationContext, mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+                tasks.push_back(new AddToListeners(operationContext, mListeners, tracker, getCookies()));
+                tasks.push_back(new SetAndGetSessionControllerTask(operationContext,
+                        mObjAdapter, mSessions, sessions, this, mLogger));
+            
+                //
+                // The new sessions being added to the bridge need to have their connected line set and
+                // forwarded to the existing sessions.
+                //
+                // TODO: This very much like the SessionOperations which are
+                // applied with the "visit" operation.
+                //
+                for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin();
+                     infoIter != sessionInfos.end(); ++infoIter)
+                {
+                    if (infoIter->info && infoIter->info->sessionOwner)
+                    {
+                        tasks.push_back(new UpdateConnectedLineTask(operationContext, this, infoIter->sessionProxy,
+                                new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
+                        tasks.push_back(new ForwardConnectedLineTask(operationContext, this, infoIter->sessionProxy, mLogger));
+                    }
+                }
 
-        //The new sessions being added to the bridge need to have their connected line set and
-        //forwarded to the existing sessions.
-        for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin(); infoIter != sessionInfos.end(); ++infoIter)
-        {
-            if (infoIter->info && infoIter->info->sessionOwner)
+                //The existing sessions need to have their stored connected line forwarded to the new
+                //sessions
+                SessionSeq existingSessions = getSessions()->getSessionSeq();
+                for (SessionSeq::const_iterator sessionIter = existingSessions.begin();
+                     sessionIter != existingSessions.end(); ++sessionIter)
+                {
+                    tasks.push_back(new ForwardConnectedLineTask(operationContext, this, *sessionIter, mLogger));
+                }
+            
+                tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(data->getProxy(), tracker));
+                tasks.push_back(new SetupMedia(operationContext, this));
+                tasks.push_back(new ConnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+                tasks.push_back(new UpdateTask(operationContext, this));
+                ExecutorPtr executor(new Executor(tasks, mLogger));
+                executor->start();
+                //
+                // When the operations have all completed, that last task will take care of handling
+                // the callback. It's all left withing the try/catch in the event something happens during
+                // task startup.
+                //
+            }
+            catch (const std::exception& ex)
             {
-                tasks.push_back(new UpdateConnectedLineTask(this, infoIter->sessionProxy, new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
-                tasks.push_back(new ForwardConnectedLineTask(this, infoIter->sessionProxy, mLogger));
+                data->getProxy()->ice_exception(ex);
+            }
+            catch (...)
+            {
+                data->getProxy()->ice_exception();
             }
         }
-
-        //The existing sessions need to have their stored connected line forwarded to the new
-        //sessions
-        SessionSeq existingSessions = getSessions()->getSessionSeq();
-        for (SessionSeq::const_iterator sessionIter = existingSessions.begin(); sessionIter != existingSessions.end(); ++sessionIter)
-        {
-            tasks.push_back(new ForwardConnectedLineTask(this, *sessionIter, mLogger));
-        }
-
-        tasks.push_back(QueuedTaskPtr::dynamicCast(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker)));
-        tasks.push_back(new SetupMedia(this));
-        tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
-        tasks.push_back(new UpdateTask(this));
-        ExecutorPtr executor(new Executor(tasks, mLogger));
-        executor->start();
-        //
-        // When the operations have all completed, that last task will take care of handling
-        // the callback. It's all left withing the try/catch in the event something happens during
-        // task startup.
-        //
     }
     catch (const std::exception& ex)
     {
@@ -1628,60 +1790,72 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
     }
 }
 
+typedef AMDContextData<AMD_Bridge_removeSessionsPtr> RemoveSessionsContextData;
+typedef boost::shared_ptr<RemoveSessionsContextData> RemoveSessionsContextDataPtr;
+
 void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback,
     const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const SessionSeq& sessions,
     const Ice::Current&)
 {
     try
     {
-        if (!mOperationContextCache->addOperationContext(operationContext))
+        RemoveSessionsContextDataPtr data(
+            getContext<RemoveSessionsContextDataPtr, AMD_Bridge_removeSessionsPtr>(
+                mOperationContextCache, operationContext, callback));
+        if (data)
         {
-            //
-            // Ignore duplicate.
-            //
-            callback->ice_response();
-            return;
-        }
-        if (sessions.empty())
-        {
-            callback->ice_response();
-            return;
-        }
-        checkSessions(sessions);
-        statePreCheck();
-        mSessions->reap();
+            try
+            {
+                if (sessions.empty())
+                {
+                    data->getProxy()->ice_response();
+                    return;
+                }
+                checkSessions(sessions);
+                statePreCheck();
+                mSessions->reap();
 
-        //
-        // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
-        // we allow them to proceed asynchronously and do not concern ourselves with the result.
-        // The logic of shutdown should remove them either because the operations succeeded or
-        // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
-        // is pointless.
-        //
-        SessionsTrackerPtr removed(new SessionsTracker);
-        for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
-        {
-            SessionWrapperPtr session = mSessions->getSession(*i);
-            if (session)
+                //
+                // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+                // we allow them to proceed asynchronously and do not concern ourselves with the result.
+                // The logic of shutdown should remove them either because the operations succeeded or
+                // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+                // is pointless.
+                //
+                SessionsTrackerPtr removed(new SessionsTracker);
+                for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
+                {
+                    SessionWrapperPtr session = mSessions->getSession(*i);
+                    if (session)
+                    {
+                        removed->add(session->getSession());
+                        mSessions->removeSession(session->getBridgedSession());
+                    }
+                }
+                QueuedTasks tasks;
+
+                BridgeCookies cookies;
+                {
+                    boost::shared_lock<boost::shared_mutex> lock(mLock);
+                    cookies = getCookies();
+                }
+                tasks.push_back(new RemoveSessionsNotify(operationContext, mListeners, removed, cookies));
+                tasks.push_back(new RemoveSessionControllerTask(operationContext, mObjAdapter, removed, mLogger));
+                tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(data->getProxy(), removed));
+                tasks.push_back(new DisconnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+                tasks.push_back(new CheckShutdown(this, mPrx));
+                ExecutorPtr runner(new Executor(tasks, mLogger));
+                runner->start();
+            }
+            catch (const std::exception& ex)
             {
-                removed->add(session->getSession());
-                mSessions->removeSession(session->getBridgedSession());
+                data->getProxy()->ice_exception(ex);
+            }
+            catch (...)
+            {
+                data->getProxy()->ice_exception();
             }
         }
-        QueuedTasks tasks;
-
-        BridgeCookies cookies;
-        {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            cookies = getCookies();
-        }
-        tasks.push_back(new RemoveSessionsNotify(mListeners, removed, cookies));
-        tasks.push_back(new RemoveSessionControllerTask(mObjAdapter, removed, mLogger));
-        tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
-        tasks.push_back(new DisconnectTelephonyEventsTask(this, sessions, mLogger));
-        tasks.push_back(new CheckShutdown(this, mPrx));
-        ExecutorPtr runner(new Executor(tasks, mLogger));
-        runner->start();
     }
     catch (const std::exception& ex)
     {
@@ -1700,83 +1874,112 @@ SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
     return mSessions->getSessionSeq();
 }
 
-void BridgeImpl::shutdown(const Ice::Current& current)
+void BridgeImpl::shutdown(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const Ice::Current& current)
 {
-    mSessions->reap();
-    //
-    // In an effort to allow some consistency with replicas, the shutdown operation is broken into
-    // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
-    //
+    ContextDataPtr data(checkAndThrow(mOperationContextCache, operationContext));
+    if (data)
+    {
+        try
+        {
+            mSessions->reap();
+            //
+            // In an effort to allow some consistency with replicas, the
+            // shutdown operation is broken into two parts. In an asynchronous
+            // version, this would probably be a couple of queued tasks.
+            //
     
-    //
-    // When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
-    // no other internal locks.
-    //
-    mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
+            //
+            // When shutting down, the bridge makes a copy of its current
+            // state and unlocks, proceeding with no other internal locks.
+            //
+            mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
 
-    BridgeStateItemPtr update;
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        if (mState->runningState == ShuttingDown)
+            BridgeStateItemPtr update;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                if (mState->runningState == ShuttingDown)
+                {
+                    mLogger(Trace) << FUNLOG << ": called when shutting down." ;
+                    return;
+                }
+                if (mState->runningState == Destroyed)
+                {
+                    mLogger(Trace) << FUNLOG << ": called when destroyed." ;
+                    throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+                }
+                mState->runningState = ShuttingDown;
+
+                mListeners->stopping(getCookies());
+                update = createUpdate();
+            }
+            pushUpdate(update);
+            SessionCollectionPtr currentCollection;
+            {
+                //
+                // Currently the slice defines the response "Normal Clearing" as the default
+                // value so we shouldn't need to set anything here.
+                //
+                ResponseCodePtr responseCode = new ResponseCode;
+                ShutdownSessionOperation shutdownOp(mSessionListenerPrx, responseCode, mLogger);
+                mSessions->visitSessions(shutdownOp);
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                mListeners->stopped(getCookies());
+                mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
+                mState->runningState = Destroyed;
+                //
+                // Remove references to the session listener implementation.
+                //
+                mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+                mSessionListener = 0;
+                currentCollection = mSessions;
+                mSessions = 0;
+            }
+            if (currentCollection)
+            {
+                currentCollection->destroy();
+            }
+    
+            try
+            {
+                if (replicate())
+                {
+                    Ice::StringSeq keys;
+                    keys.push_back(mState->key);
+                    mReplicator->removeState(AsteriskSCF::Operations::createContext(operationContext), keys);
+                }
+            }
+            catch (const Ice::Exception&)
+            {
+            }
+
+            mSessions = 0;
+            data->getMonitor()->setCompleted();
+        }
+        catch (const Ice::Exception& ex)
         {
-            mLogger(Trace) << FUNLOG << ": called when shutting down." ;
-            return;
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
         }
-        if (mState->runningState == Destroyed)
+        catch (const std::exception& ex)
         {
-            mLogger(Trace) << FUNLOG << ": called when destroyed." ;
-            throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
         }
-        mState->runningState = ShuttingDown;
-
-        mListeners->stopping(getCookies());
-        update = createUpdate();
-    }
-    pushUpdate(update);
-    SessionCollectionPtr currentCollection;
-    {
-        //
-        // Currently the slice defines the response "Normal Clearing" as the default
-        // value so we shouldn't need to set anything here.
-        //
-        ResponseCodePtr responseCode = new ResponseCode;
-        ShutdownSessionOperation shutdownOp(mSessionListenerPrx, responseCode, mLogger);
-        mSessions->visitSessions(shutdownOp);
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mListeners->stopped(getCookies());
-        mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
-        mState->runningState = Destroyed;
-        //
-        // Remove references to the session listener implementation.
-        //
-        mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
-        mSessionListener = 0;
-        currentCollection = mSessions;
-        mSessions = 0;
-    }
-    if (currentCollection)
-    {
-        currentCollection->destroy();
-    }
-    
-    try
-    {
-        if (replicate())
+        catch (...)
         {
-            Ice::StringSeq keys;
-            keys.push_back(mState->key);
-            mReplicator->removeState(keys);
+            data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+            throw;
         }
     }
-    catch (const Ice::Exception&)
-    {
-    }
-
-    mSessions = 0;
 }
 
-void BridgeImpl::destroy(const Ice::Current& current)
+void BridgeImpl::destroy(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current)
 {
+    //
+    // destroy() currently does *no* RPCs except to the replicator so
+    // this should be okay.
+    //
     mLogger(Trace) << FUNLOG << ":" <<  objectIdFromCurrent(current);
     BridgeStateItemPtr update;
     SessionCollectionPtr currentCollection;
@@ -1817,7 +2020,7 @@ void BridgeImpl::destroy(const Ice::Current& current)
         {
             Ice::StringSeq keys;
             keys.push_back(mState->key);
-            mReplicator->removeState(keys);
+            mReplicator->removeState(AsteriskSCF::Operations::createContext(context), keys);
         }
     }
     catch (const Ice::Exception&)
@@ -1828,154 +2031,220 @@ void BridgeImpl::destroy(const Ice::Current& current)
 void BridgeImpl::addListener(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, 
     const BridgeListenerPrx& listener, const Ice::Current& current)
 {
-    if (!mOperationContextCache->addOperationContext(operationContext))
+    ContextDataPtr data(checkAndThrow(mOperationContextCache, operationContext));
+    if (data)
     {
-        return;
-    }
-    mLogger(Trace) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
-        (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
-    if (!listener)
-    {
-        //
-        // TODO should probably throw an exception here.
-        //
-        return;
-    }
+        try
+        {
+            mLogger(Trace) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
+                (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+            if (!listener)
+            {
+                //
+                // TODO should probably throw an exception here.
+                //
+                return;
+            }
 
-    //
-    // Careful about ordering lest short-circuit become an issue
-    //
-    if (mListeners->addListener(listener) && replicate())
-    {
-        ReplicatedStateItemSeq seq;
-        seq.push_back(createFirstListenerUpdate(listener));
-        mReplicator->setState(seq);
+            //
+            // Careful about ordering lest short-circuit become an issue
+            //
+            if (mListeners->addListener(listener) && replicate())
+            {
+                ReplicatedStateItemSeq seq;
+                seq.push_back(createFirstListenerUpdate(listener));
+                mReplicator->setState(AsteriskSCF::Operations::createContext(operationContext), seq);
+            }
+            data->getMonitor()->setCompleted();
+        }
+        catch (const Ice::Exception& ex)
+        {
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
+        }
+        catch (const std::exception& ex)
+        {
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
+        }
+        catch (...)
+        {
+            data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+            throw;
+        }
     }
 }
 
 void BridgeImpl::removeListener(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
     const BridgeListenerPrx& listener, const Ice::Current& current)
 {
-    if (!mOperationContextCache->addOperationContext(operationContext))
-    {
-        return;
-    }
-    mLogger(Trace) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
-        (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
-    if (!listener)
+    ContextDataPtr data(checkAndThrow(mOperationContextCache, operationContext));
+    if (data)
     {
-        //
-        // TODO should probably throw an exception here.
-        //
-        return;
-    }
-    if (mListeners->removeListener(listener))
-    {
-        string key = mState->bridgeId + ".listener." +
-            mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
-        if (replicate())
+        try
         {
-            try
+            mLogger(Trace) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
+                (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+            if (!listener)
             {
-                Ice::StringSeq keys;
-                keys.push_back(key);
-                mReplicator->removeState(keys);
+                //
+                // TODO should probably throw an exception here.
+                //
+                return;
             }
-            catch (const std::exception& ex)
+            if (mListeners->removeListener(listener))
             {
-                mLogger(Error) << "call to remove state item " << key << " failed with exception " << ex.what();
+                string key = mState->bridgeId + ".listener." +
+                    mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+                if (replicate())
+                {
+                    try
+                    {
+                        Ice::StringSeq keys;
+                        keys.push_back(key);
+                        mReplicator->removeState(AsteriskSCF::Operations::createContext(operationContext), keys);
+                    }
+                    catch (const std::exception& ex)
+                    {
+                        mLogger(Error) << "call to remove state item " << key << " failed with exception " << ex.what();
+                    }
+                }
             }
+            data->getMonitor()->setCompleted();
+        }
+        catch (const Ice::Exception& ex)
+        {
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
+        }
+        catch (const std::exception& ex)
+        {
+            data->setException(ExceptionWrapper::create(ex));
+            throw;
+        }
+        catch (...)
+        {
+            data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+            throw;
         }
     }
 }
 
+typedef AMDContextData<AMD_Bridge_replaceSessionPtr> ReplaceSessionContextData;
+typedef boost::shared_ptr<ReplaceSessionContextData> ReplaceSessionContextDataPtr;
+
 void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback,
-    const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const AsteriskSCF::System::V1::OperationContextPtr& context,
     const SessionPrx& sessionToReplace,
     const SessionWithSessionInfoSeq& newSessionInfos,
     const Ice::Current& current)
 {
     try
     {
-        if (!mOperationContextCache->addOperationContext(operationContext))
-        {
-            callback->ice_response();
-            return;
-        }
-        mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
-        mSessions->reap();
-        SessionSeq newSessions;
-        newSessions.resize(newSessionInfos.size());
-        std::transform(newSessionInfos.begin(), newSessionInfos.end(), newSessions.begin(), extractSession);
-        checkSessions(newSessions);
-        statePreCheck();
-
-        SessionWrapperPtr session = mSessions->getSession(sessionToReplace);
-        //
-        // If the session did not exist on this bridge, then this operation should not proceed.
-        //
-        if (!session)
+        ReplaceSessionContextDataPtr data(getContext<ReplaceSessionContextDataPtr,
+                                                     AMD_Bridge_replaceSessionPtr>(
+                                                         mOperationContextCache, context,
+                                                         callback));
+        if (data)
         {
-            throw SessionNotFound(sessionToReplace);
-        }
-        //
-        // Shutdown is inheritently asynchronous (see SessionWrapper::shutdown())
-        //
-        SessionsTrackerPtr removeTracker(new SessionsTracker);
-        removeTracker->add(session->getSession());
-        session->shutdown(mSessionListenerPrx, new ResponseCode);
+            try
+            {
+                mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
+                mSessions->reap();
+                SessionSeq newSessions;
+                newSessions.resize(newSessionInfos.size());
+                std::transform(newSessionInfos.begin(), newSessionInfos.end(), newSessions.begin(), extractSession);
+                checkSessions(newSessions);
+                statePreCheck();
+
+                SessionWrapperPtr session = mSessions->getSession(sessionToReplace);
+                //
+                // If the session did not exist on this bridge, then this operation should not proceed.
+                //
+                if (!session)
+                {
+                    try
... 4873 lines suppressed ...


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list