[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 8 17:26:56 CDT 2012


branch "master" has been updated
       via  55fbdc3b30aed0195d034082502f7798b0351059 (commit)
      from  75c04280f9f1c35bf3927dc83350870ddeee2395 (commit)

Summary of changes:
 config/test_bridging.conf                          |    9 +-
 .../BridgeService/BridgeReplicatorIf.ice           |   37 +-
 src/BridgeCreationExtensionPointImpl.cpp           |  114 +-
 src/BridgeCreationExtensionPointImpl.h             |    3 +-
 src/BridgeImpl.cpp                                 | 1627 +++++++++++++-------
 src/BridgeImpl.h                                   |   34 +-
 src/BridgeListenerMgr.cpp                          |   33 +-
 src/BridgeListenerMgr.h                            |   11 +
 src/BridgeManagerImpl.cpp                          |  890 ++++++++----
 src/BridgeManagerImpl.h                            |    3 +-
 src/BridgeManagerListenerMgr.cpp                   |   23 +-
 src/BridgeManagerListenerMgr.h                     |    5 +
 src/BridgePartyIdExtensionPoint.cpp                |  271 ++++-
 src/BridgePartyIdExtensionPoint.h                  |   31 +-
 src/BridgeReplicatorStateListenerI.cpp             |   53 +-
 src/BridgeReplicatorStateListenerI.h               |    5 +-
 src/BridgeServiceConfig.h                          |   44 +-
 src/CMakeLists.txt                                 |    1 +
 src/Component.cpp                                  |    5 +-
 src/ComponentStateReplicator.cpp                   |    6 +-
 src/MediaMixer.cpp                                 |   19 +-
 src/MediaSplicer.cpp                               |  138 ++-
 src/MediaSplicer.h                                 |    9 +-
 ...atorStateListenerI.h => ReplicatorSmartProxy.h} |   30 +-
 src/ServiceUtil.h                                  |    9 +-
 src/SessionCollection.cpp                          |   44 +-
 src/SessionCollection.h                            |   25 +-
 src/SessionListener.cpp                            |  281 ++--
 src/SessionListener.h                              |   50 +-
 src/SessionOperations.cpp                          |  157 ++-
 src/SessionOperations.h                            |  114 ++-
 src/SessionWrapper.cpp                             |  163 ++-
 src/SessionWrapper.h                               |   57 +-
 test/BridgeListenerI.cpp                           |   15 +-
 test/BridgeListenerI.h                             |   14 +-
 test/BridgeManagerListenerI.cpp                    |   12 +-
 test/BridgeManagerListenerI.h                      |    9 +-
 test/CMakeLists.txt                                |   36 +-
 test/ReplicationUnitTests.cpp                      |  191 +++
 test/TestBridging.cpp                              |  182 ++-
 test/UnitTests.cpp                                 |   69 +-
 41 files changed, 3416 insertions(+), 1413 deletions(-)
 copy src/{BridgeReplicatorStateListenerI.h => ReplicatorSmartProxy.h} (55%)
 create mode 100644 test/ReplicationUnitTests.cpp


- Log -----------------------------------------------------------------
commit 55fbdc3b30aed0195d034082502f7798b0351059
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Tue May 8 09:56:33 2012 -0500

    Retry logic changes. Also includes use of base State Replicaotor class.

diff --git a/config/test_bridging.conf b/config/test_bridging.conf
index 2854d81..09079b8 100644
--- a/config/test_bridging.conf
+++ b/config/test_bridging.conf
@@ -20,7 +20,11 @@ IceBox.InheritProperties=1
 # It's important to specify a reasonable size for the client thread pool for services
 # that use AMI/AMD
 # 
-Ice.ThreadPool.Client.Size=4
+Ice.ThreadPool.Client.Size=10
+Ice.ThreadPool.Server.Size=10
+
+# For unit test we don't replicate the Service Locator
+ServiceDiscovery.Standalone = true
 
 ServiceDiscovery.IceStorm.TopicManager.Endpoints=default -p 4421
 ServiceDiscovery.IceStorm.TopicManager.ThreadPool.Size=4
@@ -41,6 +45,7 @@ LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -h 127.0.0.1 -p 4422
 
 LoggerAdapter.Endpoints=default
 Logger.ServiceAdapter.Endpoints=default
+Logger.Standalone = true
 AsteriskSCF.LoggingService.Endpoints=default
 AsteriskSCF.LoggingService.ThreadPool.Size=4
 AsteriskSCF.LoggingClient.Endpoints=default
@@ -81,12 +86,10 @@ TestBridge.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 57000
 TestBridge.ServiceAdapter.ThreadPool.Size=2
 TestBridge.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57001
 TestBridge.BackplaneAdapter.ThreadPool.Size=2
-TestBridge.Standby=false
 TestBridge.LogLevel=1
 
 TestBridge2.InstanceName=TestBridge2
 TestBridge2.BridgeManagerObjectId=TestBridgeManager2
-TestBridge2.Standby=true
 TestBridge2.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 57010
 TestBridge2.ServiceAdapter.ThreadPool.Size=4
 TestBridge2.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57011
diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 3dbe763..8fc042d 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsExtensionPointsIf.ice>
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
 #include <AsteriskSCF/Media/MediaIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
 
 module AsteriskSCF
 {
@@ -75,6 +76,15 @@ enum MediaOperationReplicationPolicy
     Reconstruct         /* Media operations are reconstructed on the replica */
 };
 
+
+/**
+ * Items to support replication of contexts. The strategy here is to simply
+ * replicate a context along with a coded value to indicate what operation
+ * the context was for. The coded value is a simple integral constant for
+ * the time being. This is a little bit of a heavy-handed and brute force,
+ * but after trying other approaches, I prefer this one.
+ */
+
 /**
  *
  * A bridge establishes connections between media objects associated with sessions.
@@ -158,6 +168,7 @@ sequence<BridgedSession> BridgedSessionSeq;
  **/
 enum ServiceState
 {
+    Starting,
     Running,
     Paused,
     ShuttingDown,
@@ -170,10 +181,13 @@ sequence<AsteriskSCF::SessionCommunications::V1::BridgeManagerListener*> BridgeM
  */
 class PartyIdHooks
 {
-    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ReceivedConnectedLinePartyIdHookSeq receivedConnectedLineHooks;
-    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingConnectedLinePartyIdHookSeq forwardingConnectedLineHooks;
+    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ReceivedConnectedLinePartyIdHookSeq
+        receivedConnectedLineHooks;
+    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingConnectedLinePartyIdHookSeq
+        forwardingConnectedLineHooks;
     AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingCallerPartyIdHookSeq forwardingCallerHooks;
-    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingRedirectionsPartyIdHookSeq forwardingRedirectionsHooks;
+    AsteriskSCF::SessionCommunications::ExtensionPoints::V1::ForwardingRedirectionsPartyIdHookSeq
+        forwardingRedirectionsHooks;
 };
 
 /**
@@ -219,6 +233,8 @@ class BridgeStateItem extends ReplicatedStateItem
      * its own snapshot of applicable hooks. 
      */
     PartyIdHooks partyIdHookSet;
+
+    AsteriskSCF::System::V1::OperationContext originatingContext;
 };
 
 class BridgeListenerStateItem extends ReplicatedStateItem
@@ -227,6 +243,7 @@ class BridgeListenerStateItem extends ReplicatedStateItem
     AsteriskSCF::SessionCommunications::V1::BridgeListener* listener;
 };
 
+
 /**
  *
  * The BridgeReplicatorListener interface must be implemented by components
@@ -236,8 +253,8 @@ class BridgeListenerStateItem extends ReplicatedStateItem
  **/
 interface ReplicatorListener
 {
-    void stateRemoved(Ice::StringSeq itemKeys);
-    void stateSet(ReplicatedStateItemSeq items);
+    idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+    idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
 };
 
 /**
@@ -248,11 +265,13 @@ interface ReplicatorListener
  **/
 interface Replicator
 {
-    void addListener(ReplicatorListener* listener);
-    void removeListener(ReplicatorListener* listener);
+    idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext,
+            ReplicatorListener* listener);
+    idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext,
+            ReplicatorListener* listener);
 
-    void setState(ReplicatedStateItemSeq items);
-    void removeState(Ice::StringSeq items);
+    idempotent void setState(AsteriskSCF::System::V1::OperationContext operationContext, ReplicatedStateItemSeq items);
+    idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
 
     idempotent ReplicatedStateItemSeq getState(Ice::StringSeq itemKeys);
     idempotent ReplicatedStateItemSeq getAllState();
diff --git a/src/BridgeCreationExtensionPointImpl.cpp b/src/BridgeCreationExtensionPointImpl.cpp
index 172321f..105d6fa 100755
--- a/src/BridgeCreationExtensionPointImpl.cpp
+++ b/src/BridgeCreationExtensionPointImpl.cpp
@@ -17,12 +17,16 @@
 #include "BridgeCreationExtensionPointImpl.h"
 #include <boost/thread/shared_mutex.hpp>
 #include "BridgeServiceConfig.h"
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 using namespace AsteriskSCF::BridgeService;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::Hook::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace 
 {
@@ -30,46 +34,97 @@ namespace
     {
     public:
         BridgeExtPtImpl(const Logger& logger) :
-            mLogger(logger)
+            mLogger(logger),
+            mContextCache(AsteriskSCF::Operations::OperationContextCache::create(60)) /* XXX make configurable */
         {
         }
 
-        void addHook(const BridgeCreationHookPrx& newHook, const Ice::Current& current)
+        void addHook(const AsteriskSCF::System::V1::OperationContextPtr& context, const 
+            BridgeCreationHookPrx& newHook, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            BridgeCreationHookSeq::iterator i = 
-                find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
-            if (i != mHooks.end())
-            {
-                mLogger(Trace) << "refreshing creation hook " << newHook << " on " 
-                    << objectIdFromCurrent(current);
-                //
-                // Refresh the proxy if it is already in the vector.
-                // 
-                *i = newHook;
-            }
-            else
+            ContextDataPtr data(checkAndThrow(mContextCache, context));
+            if (data)
             {
-                mLogger(Trace) << "adding creation hook " << newHook << " on " 
-                    << objectIdFromCurrent(current);
-                mHooks.push_back(newHook);
+                try
+                {
+                    UniqueLock lock(mLock);
+                    BridgeCreationHookSeq::iterator i = 
+                        find_if(mHooks.begin(), mHooks.end(), IdentityComparePredicate<BridgeCreationHookPrx>(newHook));
+                    if (i != mHooks.end())
+                    {
+                        mLogger(Trace) << "refreshing creation hook " << newHook << " on " 
+                                       << objectIdFromCurrent(current);
+                        //
+                        // Refresh the proxy if it is already in the vector.
+                        // 
+                        *i = newHook;
+                    }
+                    else
+                    {
+                        mLogger(Trace) << "adding creation hook " << newHook << " on " 
+                                       << objectIdFromCurrent(current);
+                        mHooks.push_back(newHook);
+                    }
+                    data->getMonitor()->setCompleted();
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (const std::exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (...)
+                {
+                    data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+                    throw;
+                }
             }
         }
 
-        void removeHook(const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
+        void removeHook(const AsteriskSCF::System::V1::OperationContextPtr& context, 
+            const BridgeCreationHookPrx& hookToRemove, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            mLogger(Trace) << "removing creation hook " << hookToRemove << " on " 
-                << objectIdFromCurrent(current);
-            mHooks.erase(remove_if(mHooks.begin(), mHooks.end(), 
-                    IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+            ContextDataPtr data(checkAndThrow(mContextCache, context));
+            if (data)
+            {
+                try
+                {
+                    UniqueLock lock(mLock);
+                    mLogger(Trace) << "removing creation hook " << hookToRemove << " on " 
+                                   << objectIdFromCurrent(current);
+                    mHooks.erase(remove_if(mHooks.begin(), mHooks.end(), 
+                            IdentityComparePredicate<BridgeCreationHookPrx>(hookToRemove)), mHooks.end());
+                }
+                catch (const Ice::Exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (const std::exception& ex)
+                {
+                    data->setException(ExceptionWrapper::create(ex));
+                    throw;
+                }
+                catch (...)
+                {
+                    data->setException(ExceptionWrapper::create("Unknown unexpected exception"));
+                    throw;
+                }
+            }
         }
 
-        void clearHooks(const Ice::Current& current)
+        void clearHooks(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current)
         {
-            UniqueLock lock(mLock);
-            mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
-            mHooks.clear();
+            if (mContextCache->addOperationContext(context))
+            {
+                UniqueLock lock(mLock);
+                mLogger(Trace) << "clearing hooks from " << objectIdFromCurrent(current);
+                mHooks.clear();
+            }
         }
 
         BridgeCreationHookDataPtr runHooks(const BridgeCreationHookDataPtr& originalData)
@@ -83,7 +138,7 @@ namespace
                 try
                 {
                     BridgeCreationHookDataPtr resultData; 
-                    HookResult result = (*i)->execute(tokenData, resultData);
+                    HookResult result = (*i)->execute(AsteriskSCF::Operations::createContext(), tokenData, resultData);
                     if (result.status == AsteriskSCF::System::Hook::V1::Succeeded)
                     {
                         tokenData = resultData;
@@ -110,6 +165,7 @@ namespace
 
         Logger mLogger;
         BridgeCreationHookSeq mHooks;
+        AsteriskSCF::Operations::OperationContextCachePtr mContextCache;
 
         void removeHooks(const BridgeCreationHookSeq& hooks)
         {
diff --git a/src/BridgeCreationExtensionPointImpl.h b/src/BridgeCreationExtensionPointImpl.h
index 5cc6c6a..b3b2cfc 100755
--- a/src/BridgeCreationExtensionPointImpl.h
+++ b/src/BridgeCreationExtensionPointImpl.h
@@ -38,7 +38,8 @@ namespace BridgeService
     /**
      * Base class for internal methods.
      */
-    class BridgeCreationExtensionPointImpl : public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
+    class BridgeCreationExtensionPointImpl :
+        public AsteriskSCF::SessionCommunications::ExtensionPoints::V1::BridgeCreationExtensionPoint
     {
     public:
 
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 688e61f..ae578b4 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -19,6 +19,7 @@
 #include <AsteriskSCF/System/Component/ComponentServiceIf.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 #include <Ice/Ice.h>
+#include <Ice/IncomingAsync.h>
 #include <boost/thread/locks.hpp>
 #include <memory>
 #include <algorithm>
@@ -29,6 +30,9 @@
 #include "SessionWrapper.h"
 #include "SessionOperations.h"
 #include "SessionListener.h"
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -36,28 +40,92 @@ using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
 using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
 using namespace AsteriskSCF::BridgeService;
 using namespace AsteriskSCF::Replication::BridgeService::V1;
+using namespace AsteriskSCF::Operations;
 using namespace AsteriskSCF;
 using namespace std;
 
 /**
  *
- * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
- * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
- * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
- * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
- * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
- * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
- * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
- * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
- * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
- * refactoring for replication could have occurred in that development branch. However, replication is a separate task
- * and it is desirable to have a *relevant* review of it independently of asynchronous support.
+ * NOTE: Once asynchronous invocations began to be added to the mix, several
+ * shortcomings of both the initial implementation of the bridge and its
+ * replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions.
+ * When converting these operations to AMD/AMI, the serial approach completely
+ * falls apart. Especially when replication is thrown into the mix. For
+ * example, in a typical AMD/AMI scenario, the setBridge() and media allocation
+ * operations would occur as AMI requests. The apparent outcome will be that
+ * the participants will "appear" on the call in the order in which the
+ * operations complete (i.e. randomly). Indeed it is possible that some will
+ * ultimately fail and some sessions might actually be removed from the bridge
+ * before they are able to complete. Reconciling the completely asynchronous
+ * nature to an implementation is just too awful. As asynchronous support in
+ * bridging is the driving factor behind these changes, the refactoring for
+ * replication could have occurred in that development branch. However,
+ * replication is a separate task and it is desirable to have a *relevant*
+ * review of it independently of asynchronous support.
  *
  */
 
 namespace
 {
 
+class SessionsTracker : public IceUtil::Shared
+{
+public:
+    SessionsTracker() :
+        mResponses(0)
+    {
+    }
+    
+    void add(const SessionPrx& s)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSessions.push_back(s);
+        ++mResponses;
+    }
+ 
+    SessionSeq getSessions()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mSessions;
+    }
+
+    void addException(const SessionPrx& session, const Ice::Exception& ex)
+    {
+        addExceptionMessage(session, ex.what());
+    }
+
+    void addExceptionMessage(const SessionPrx& session, const string& msg)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        SessionError error;
+        error.failedSession = session;
+        error.message = msg;
+        mExceptions.push_back(error);
+        ++mResponses;
+    }
+
+    SessionErrorSeq getExceptions()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mExceptions;
+    }
+
+    size_t responseCount()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mResponses;
+    }
+
+private:
+    boost::shared_mutex mLock;
+    SessionSeq mSessions;
+    SessionErrorSeq mExceptions;
+    size_t mResponses;
+};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
+
+
 /**
  * 
  * BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
@@ -80,25 +148,33 @@ public:
     // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
     //
     void addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
-            const SessionWithSessionInfoSeq& sessionInfos,
-            const Ice::Current&);
-    void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
-            const Ice::Current&);
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionWithSessionInfoSeq& sessionInfos,
+        const Ice::Current&);
+    void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback,
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionSeq& sessions,
+        const Ice::Current&);
 
     SessionSeq listSessions(const Ice::Current&);
-    void shutdown(const Ice::Current& current);
-    void destroy(const Ice::Current& current);
+    void shutdown(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
+    void destroy(const AsteriskSCF::System::V1::OperationContextPtr& context, const Ice::Current& current);
 
-    void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
-    void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+    void addListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+        const BridgeListenerPrx& listener, const Ice::Current& current);
+    void removeListener(const AsteriskSCF::System::V1::OperationContextPtr&,
+        const BridgeListenerPrx& listener, const Ice::Current& current);
 
-    void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac,
-            const SessionPrx& sessionToReplace,
-            const SessionWithSessionInfoSeq& newSessionInfos,
-            const Ice::Current& current);
+    void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback,
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionPrx& sessionToReplace,
+        const SessionWithSessionInfoSeq& newSessionInfos,
+        const Ice::Current& current);
 
-    void setCookies(const BridgeCookies& cookies, const Ice::Current&);
-    void removeCookies(const BridgeCookies& cookies, const Ice::Current&);
+    void setCookies(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const BridgeCookies& cookies, const Ice::Current&);
+    void removeCookies(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const BridgeCookies& cookies, const Ice::Current&);
     BridgeCookies getCookies(const BridgeCookies& cookies, const Ice::Current&);
 
     //
@@ -112,7 +188,7 @@ public:
     bool destroyed();
     void destroyImpl();
     void shutdownImpl(const Ice::Current& current);
-    void activate(const BridgePrx& proxy, const std::string& id);
+    void finishSetup(const BridgePrx& proxy, const std::string& id);
 
     void updateState(const BridgeStateItemPtr& state);
     void addListener(const BridgeListenerStateItemPtr& update);
@@ -124,15 +200,21 @@ public:
 
     void forceUpdate();
 
-    void getAddSessionsTasks(QueuedTasks& tasks, const SessionPrx& source, const SessionSeq& sessions, const CallerPtr& callerID, const RedirectionsPtr& redirects);
+    void getAddSessionsTasks(QueuedTasks& tasks, 
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const SessionPrx& source, const SessionSeq& sessions,
+        const CallerPtr& callerID, const RedirectionsPtr& redirects);
 
-    PartyIdHooksPtr getPartyIdHooks()
+    PartyIdHooksPtr getPartyIdHooks() const
     {
         return mState->partyIdHookSet;
     }
 
     BridgeCookies getCookies()
     {
+        //
+        // TODO: use transform?
+        //
         BridgeCookies result;
         for (BridgeCookieDict::const_iterator i = mState->cookies.begin(); i != mState->cookies.end(); ++i)
         {
@@ -141,8 +223,12 @@ public:
         return result;
     }
 
-    void updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine);
-    void updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections);
+    void updateConnectedLine(const SessionWrapperPtr& sourceSession, 
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const ConnectedLinePtr& connectedLine);
+    void updateRedirections(const SessionWrapperPtr& sourceSession, 
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const RedirectionsPtr& redirections);
 
 private:
 
@@ -185,6 +271,8 @@ private:
     //
     Logger mLogger;
 
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationContextCache;
+
     void statePreCheck();
     BridgeStateItemPtr createUpdate();
     void pushUpdate(const BridgeStateItemPtr& update);
@@ -199,9 +287,9 @@ private:
 typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 
 //
-// simply checks for nulls at this point.
+// Simply checks for nulls at this point.
 //
-static void checkSessions(const SessionSeq& sessions)
+void checkSessions(const SessionSeq& sessions)
 {
     Ice::LongSeq invalidIndexes;
     Ice::Long index = 0;
@@ -218,63 +306,6 @@ static void checkSessions(const SessionSeq& sessions)
     }
 }
 
-class SessionsTracker : public IceUtil::Shared
-{
-public:
-    SessionsTracker() :
-        mResponses(0)
-    {
-    }
-    
-    void add(const SessionPrx& s)
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mSessions.push_back(s);
-        ++mResponses;
-    }
- 
-    SessionSeq getSessions()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return mSessions;
-    }
-
-    void addException(const SessionPrx& session, const Ice::Exception& ex)
-    {
-        addExceptionMessage(session, ex.what());
-    }
-
-    void addExceptionMessage(const SessionPrx& session, const string& msg)
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        SessionError error;
-        error.failedSession = session;
-        error.message = msg;
-        mExceptions.push_back(error);
-        ++mResponses;
-    }
-
-    SessionErrorSeq getExceptions()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return mExceptions;
-    }
-
-    size_t responseCount()
-    {
-        boost::shared_lock<boost::shared_mutex> lock(mLock);
-        return mResponses;
-    }
-
-private:
-    boost::shared_mutex mLock;
-    SessionSeq mSessions;
-    SessionErrorSeq mExceptions;
-    size_t mResponses;
-};
-typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
-
-
 /**
  * Forwards the redirection records for the specified session
  * to every other session in the bridge. Applies the 
@@ -289,11 +320,14 @@ typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
 class ForwardRedirectionsUpdatedTask : public QueuedTask
 {
 public:
-    ForwardRedirectionsUpdatedTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const RedirectionsPtr& redirections,
-            const Logger& logger) :
+    ForwardRedirectionsUpdatedTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const RedirectionsPtr& redirections,
+        const Logger& logger) :
         QueuedTask("ForwardRedirectionsUpdatedTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mRedirections(redirections),
@@ -312,17 +346,25 @@ protected:
         }
         try
         {
-
+            //
             // Forward the ConnectedLine to each bridged session.
+            //
             SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
             for(SessionSeq::iterator i = sessions.begin();
                 i != sessions.end(); ++i)
             {
+                //
+                // TODO: This looks like a SessionOperation.
+                //
                 if (!(*i) || (*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
                 {
                     continue;
                 }
 
+                //
+                // TODO: Why doesn't this loop simply use the sequence of SessionWrappers you
+                // can retrieve from the SessionCollection?
+                //
                 SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
                 if (destSessionWrapper)
                 {
@@ -359,8 +401,12 @@ protected:
             {
                 try
                 {
-                    // Apply this hook. 
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(mSourceSession,
+                    // Apply this hook.
+                    //
+                    // TODO: Are hooks stateful? Do I have to care about consistent contexts?
+                    //
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(
+                        AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
                         destinationSession->getSession(),
                         currentRedirections, destSpecificRedirections);
 
@@ -379,12 +425,16 @@ protected:
         SessionControllerPrx sessionController = destinationSession->getSessionController();
         if (sessionController)
         {
+            
             // Forward the info via the SessionController for this session.
-            sessionController->updateRedirections(currentRedirections);
+            sessionController->updateRedirections(
+                calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+                currentRedirections);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     RedirectionsPtr mRedirections;
@@ -406,10 +456,13 @@ private:
 class ForwardConnectedLineTask : public QueuedTask
 {
 public:
-    ForwardConnectedLineTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const Logger& logger) :
+    ForwardConnectedLineTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const Logger& logger) :
         QueuedTask("ForwardConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mLogger(logger)
@@ -437,7 +490,10 @@ protected:
                 return true; 
             }
 
+            //
             // Forward the ConnectedLine to each bridged session.
+            // TODO: Implement in terms of an operation to be used with the visitor.
+            //
             SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
             for(SessionSeq::iterator i = sessions.begin();
                 i != sessions.end(); ++i)
@@ -485,7 +541,9 @@ protected:
                 try
                 {
                     // Apply a hook
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(mSourceSession,
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(
+                        AsteriskSCF::Operations::createContext(mRootContext),
+                        mSourceSession,
                         destinationSession->getSession(),
                         currentConnectedLine, destSpecificConnectedLine);
 
@@ -505,11 +563,13 @@ protected:
         if (sessionController)
         {
             // Forward the info via the SessionController for this session.
-            sessionController->updateConnectedLine(currentConnectedLine);
+            sessionController->updateConnectedLine(calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+                currentConnectedLine);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     Logger mLogger;
@@ -518,11 +578,14 @@ private:
 class ForwardCallerIDTask : public QueuedTask
 {
 public:
-    ForwardCallerIDTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& source,
-            const CallerPtr& callerID,
-            const Logger& logger) :
+    ForwardCallerIDTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& source,
+        const CallerPtr& callerID,
+        const Logger& logger) :
         QueuedTask("ForwardConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSource(source),
         mCallerID(callerID),
@@ -591,7 +654,9 @@ protected:
                 try
                 {
                     // Apply a hook
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(mSource,
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(
+                        AsteriskSCF::Operations::createContext(mRootContext), 
+                        mSource,
                         destinationSession->getSession(),
                         currentCallerID, destSpecificCallerID);
 
@@ -612,11 +677,14 @@ protected:
         SessionControllerPrx sessionController =  destinationSession->getSessionController();
         if (sessionController)
         {
-            sessionController->updateCallerID(currentCallerID);
+            sessionController->updateCallerID(
+                calculateOperationContext(mRootContext, identityToString(sessionController->ice_getIdentity())),
+                    currentCallerID);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSource;
     CallerPtr mCallerID;
@@ -630,11 +698,14 @@ private:
 class UpdateConnectedLineTask : public QueuedTask
 {
 public:
-    UpdateConnectedLineTask(const BridgeImplPtr& bridge, 
-            const SessionPrx& sourceSession,
-            const ConnectedLinePtr& connectedLine,
-            const Logger& logger) :
+    UpdateConnectedLineTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge, 
+        const SessionPrx& sourceSession,
+        const ConnectedLinePtr& connectedLine,
+        const Logger& logger) :
         QueuedTask("UpdateConnectedLineTask"),
+        mRootContext(rootContext),
         mBridge(bridge),
         mSourceSession(sourceSession),
         mConnectedLine(connectedLine),
@@ -658,20 +729,39 @@ protected:
 
         if (!wrapper)
         {
+            //
+            // TODO: Returning true early allows subsequent tasks to
+            // run. Might be a dubious practice!
+            //
             mLogger(Debug) << "Unable to find matching session for, returning early with true.";
+            return true;
         }
 
         PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
         if (partyIdHooks)
         {
             // Allow the ReceivedConnectedLinePartyId hooks to alter the ConnectedLine record. 
-            for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->receivedConnectedLineHooks.begin();
+            for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i =
+                    partyIdHooks->receivedConnectedLineHooks.begin();
                 i != partyIdHooks->receivedConnectedLineHooks.end(); ++i)
             {
                 try
                 {
-                    // Apply this hook. 
-                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(mSourceSession,
+                    //
+                    // It is quite possible that this can never
+                    // happen, but it is prudent to check all the
+                    // same.
+                    //
+                    if ((*i) == 0)
+                    {
+                        continue;
+                    }
+                    
+                    //
+                    // Apply this hook.
+                    //
+                    AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(
+                        AsteriskSCF::Operations::createContext(mRootContext), mSourceSession,
                         currentConnectedLine, updatedConnectedLine);
 
                     if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
@@ -679,6 +769,11 @@ protected:
                         currentConnectedLine = updatedConnectedLine;
                     }
                 }
+                //
+                // catching and ignoring an Ice exception here might be
+                // true to the hook calls, but other exceptions really
+                // should be considered bugs.
+                //
                 catch (const std::exception& e)
                 {
                     mLogger(Debug) << FUNLOG << " : " << e.what();
@@ -686,13 +781,18 @@ protected:
             }
         }
 
-        // Cache this value.
+        //
+        // Cache this value.  TODO: can currentConnectedLine be 0 and
+        // if so, does it make sense to set it. The way
+        // setConnectedLine is written, it won't be set again.
+        //
         wrapper->setConnectedLine(currentConnectedLine);
 
         return true;
     }
            
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionPrx mSourceSession;
     ConnectedLinePtr mConnectedLine;
@@ -702,9 +802,12 @@ private:
 class RemoveSessionsNotify : public QueuedTask
 {
 public:
-    RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
-            const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
+    RemoveSessionsNotify(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeListenerMgrPtr& bridgeListeners,
+        const SessionsTrackerPtr& tracker, const BridgeCookies& cookies) :
         QueuedTask("RemoveSessionsNotify"),
+        mRootContext(rootContext),
         mBridgeListeners(bridgeListeners),
         mTracker(tracker),
         mCookies(cookies)
@@ -717,12 +820,13 @@ protected:
         SessionSeq sessions = mTracker->getSessions();
         if (!sessions.empty())
         {
-            mBridgeListeners->sessionsRemoved(sessions, mCookies);
+            mBridgeListeners->sessionsRemoved(mRootContext, sessions, mCookies);
         }
         return true;
     }
            
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeListenerMgrPtr mBridgeListeners;
     SessionsTrackerPtr mTracker;
     BridgeCookies mCookies;
@@ -731,9 +835,11 @@ private:
 class SetBridgeTask : public QueuedTask
 {
 public:
-    SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+    SetBridgeTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext, 
+            const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
             const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
         QueuedTask("SetBridgeTask"),
+        mRootContext(rootContext),
         mSessionManager(sessionCollection),
         mBridge(bridge),
         mSessionListener(listener),
@@ -758,9 +864,9 @@ protected:
                 continue;
             }
             tasksDispatched = true;
-            (*i)->begin_setBridge(mBridge, mSessionListener,
-                    newCallback_Session_setBridge(this, &SetBridgeTask::set,
-                            &SetBridgeTask::failed), session);
+            (*i)->begin_setBridge(calculateOperationContext(mRootContext, identityToString(mBridge->ice_getIdentity())), mBridge,
+                mSessionListener, newCallback_Session_setBridge(this, &SetBridgeTask::set,
+                    &SetBridgeTask::failed), session);
         }
         return !tasksDispatched;
     }
@@ -773,7 +879,7 @@ protected:
             //
             // setupMedia is an AMI backed implementation, so should not block here.
             //
-            session->setupMedia();
+            session->setupMedia(mRootContext);
         }
         if (mTracker->responseCount() == mSessions.size())
         {
@@ -797,7 +903,7 @@ protected:
             // We want to make sure that session is laying around. The session collection will
             // take care of cleaning it up as long as it is marked as destroyed.
             //
-            session->destroy();
+            session->destroy(mRootContext);
         }
         catch(...)
         {
@@ -817,6 +923,7 @@ protected:
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     SessionCollectionPtr mSessionManager;
     BridgePrx mBridge;
     SessionListenerPrx mSessionListener;
@@ -824,41 +931,123 @@ private:
     SessionsTrackerPtr mTracker;
 };
 
+typedef ContextResultData<AsteriskSCF::Media::V1::StreamInformationDict> StreamInformationContextData;
+typedef AMDContextResultData<
+    AsteriskSCF::Media::V1::StreamInformationDict,
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr> AddStreamsContextData;
+
+typedef AMDContextData<
+    AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr> RemoveStreamsContextData;
+
 class BridgeSessionController : public SessionController
 {
 public:
-    BridgeSessionController(const BridgeImplPtr& bridge,
-                            const SessionWrapperPtr& self,
-                            const Logger& logger) : mBridge(bridge), mSelf(self), mLogger(logger) { }
+    BridgeSessionController(const BridgeImplPtr& bridge, const SessionWrapperPtr& self,
+                            const Logger& logger) : 
+        mBridge(bridge), 
+        mSelf(self), 
+        mLogger(logger),
+        mOperationCache(AsteriskSCF::Operations::OperationContextCache::create(60))
+    { 
+    }
 
-    void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
-                                  const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
+    void changeStreamStates_async(
+        const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+        const AsteriskSCF::System::V1::OperationContextPtr&,
+        const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
     {
         // We do not care about stream state changes at this point in time
         cb->ice_response();
     }
 
     void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
-                          const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
-        AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
-        mBridge->getSessions()->visitSessions(*op);
+        try
+        {
+            AddStreamsContextData::ptr_type data(getContext<AddStreamsContextData>(mOperationCache,
+                    context, cb));
+            if (data)
+            {
+                try
+                {
+                    AddStreamsOperationPtr op = new AddStreamsOperation(context, data->getProxy(), mSelf, streams, mLogger);
+                    mBridge->getSessions()->visitSessions(*op);
+                }
+                catch (const std::exception& ex)
+                {
+                    data->getProxy()->ice_exception(ex);
+                }
+                catch (...)
+                {
+                    assert("If we got here, really bad things have happened!" == 0);
+                    data->getProxy()->ice_exception();
+                }
+            }
+        }
+        catch (const std::exception& ex)
+        {
+            mLogger(Error) << ex.what() << " exception occurred when trying to get operation context data.";
+            cb->ice_exception(ex);
+        }
+        catch (...)
+        {
+            assert("If we got here, really bad things have happened!" == 0);
+            mLogger(Error) << "Unknown exception occurred when trying to get operation context data.";
+            cb->ice_exception();
+        }
     }
 
     void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
-                             const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+        const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
     {
-        RemoveStreamsOperation op(mSelf, streams);
-        mBridge->getSessions()->visitSessions(op);
-        cb->ice_response();
+        try
+        {
+            RemoveStreamsContextData::ptr_type data(
+                getContext<RemoveStreamsContextData>(mOperationCache, context, cb));
+            if (data)
+            {
+                RemoveStreamsOperation op(context, mSelf, streams);
+                mBridge->getSessions()->visitSessions(op);
+                data->getProxy()->ice_response();
+            }
+        }
+        catch (const std::exception& ex)
+        {
+            mLogger(Error) << ex.what() << " exception occurred when trying to get operation context data.";
+            cb->ice_exception(ex);
+        }
+        catch (...)
+        {
+            assert("If we got here, really bad things have happened!" == 0);
+            mLogger(Error) << "Unknown exception occurred when trying to get operation context data.";
+            cb->ice_exception();
+        }
     }
 
-    void updateConnectedLine(const ConnectedLinePtr& connectedLine, const Ice::Current&)
+    void updateConnectedLine(const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const ConnectedLinePtr& connectedLine, const Ice::Current&)
     {
-        mBridge->updateConnectedLine(mSelf, connectedLine);
+        ContextDataPtr data(checkAndThrow(mOperationCache, context));
+        if (data)
+        {
+            try
+            {
+                mBridge->updateConnectedLine(mSelf, context, connectedLine);
+                data->getMonitor()->setCompleted();
+            }
+            catch (...)
+            {
+                assert("If we got here, really bad things have happened!" == 0);
+                data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+                throw;
+            }
+        }
     }
 
-    void updateCallerID(const CallerPtr&, const Ice::Current&)
+    void updateCallerID(const AsteriskSCF::System::V1::OperationContextPtr&, const CallerPtr&, const Ice::Current&)
     {
         //This shouldn't really ever be called because the caller information doesn't
         //get updated except for once, and that's at bridge creation time. If information
@@ -867,26 +1056,54 @@ public:
         return;
     }
 
-    void updateRedirections(const RedirectionsPtr& redirections, const ::Ice::Current&) 
+    void updateRedirections(const AsteriskSCF::System::V1::OperationContextPtr& context,
+        const RedirectionsPtr& redirections, const ::Ice::Current&) 
     {
-        mBridge->updateRedirections(mSelf, redirections);
+        ContextDataPtr data(checkAndThrow(mOperationCache, context));
+        if (data)
+        {
+            try
+            {
+                mBridge->updateRedirections(mSelf, context, redirections);
+                data->getMonitor()->setCompleted();
+            }
+            catch (const Ice::ObjectNotExistException&)
+            {
+                assert("We should not be ignoring or passing along ONEs" == 0);
+            }
+            catch (const std::exception& ex)
+            {
+                data->setException(ExceptionWrapper::create(ex));
+                throw;
+            }
+            catch (...)
+            {
+                assert("If we got here, really bad things have happened!" == 0);
+                data->setException(ExceptionWrapper::create("Unexpected unknown exception"));
+                throw;
+            }
+        }
     }
 
 private:
     BridgeImplPtr mBridge;
     SessionWrapperPtr mSelf;
     Logger mLogger;
+    AsteriskSCF::Operations::OperationContextCachePtr mOperationCache;
 };
 
 class SetAndGetSessionControllerTask : public QueuedTask
 {
 public:
-    SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
-                                   const SessionCollectionPtr& sessionCollection,
-                                   const SessionSeq& sessions,
-                                   const BridgeImplPtr& bridge,
-                                   const Logger& logger):
+    SetAndGetSessionControllerTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const Ice::ObjectAdapterPtr& adapter,
+        const SessionCollectionPtr& sessionCollection,
+        const SessionSeq& sessions,
+        const BridgeImplPtr& bridge,
+        const Logger& logger):
         QueuedTask("SetAndGetSessionControllerTask"),
+        mRootContext(rootContext),
         mAdapter(adapter),
         mSessionManager(sessionCollection),
         mSessions(sessions),
@@ -913,12 +1130,16 @@ protected:
             }
             tasksDispatched = true;
             SessionControllerPtr controller = new BridgeSessionController(mBridge, session, mLogger);
-	        std::string identity = (*i)->ice_getIdentity().name;
-	        identity += ".bridgecontroller";
-	        SessionControllerPrx controllerPrx =
-		    SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
-	        (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
-                                                   &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+            std::string identity = (*i)->ice_getIdentity().name;
+            identity += ".bridgecontroller";
+            SessionControllerPrx controllerPrx =
+                SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+            
+            (*i)->begin_setAndGetSessionController(
+                calculateOperationContext(mRootContext, identityToString((*i)->ice_getIdentity())), 
+                controllerPrx, 
+                newCallback_Session_setAndGetSessionController(this, &SetAndGetSessionControllerTask::get, 
+                    &SetAndGetSessionControllerTask::failed), session);
         }
         return !tasksDispatched;
     }
@@ -949,7 +1170,7 @@ protected:
             // We want to make sure that session is laying around. The session collection will
             // take care of cleaning it up as long as it is marked as destroyed.
             //
-            session->destroy();
+            session->destroy(mRootContext);
         }
         catch(...)
         {
@@ -969,6 +1190,7 @@ protected:
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     Ice::ObjectAdapterPtr mAdapter;
     SessionCollectionPtr mSessionManager;
     SessionSeq mSessions;
@@ -980,10 +1202,13 @@ private:
 class RemoveSessionControllerTask : public QueuedTask
 {
 public:
-    RemoveSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+    RemoveSessionControllerTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const Ice::ObjectAdapterPtr& adapter,
 	const SessionsTrackerPtr& sessions,
 	const Logger& logger):
         QueuedTask("RemoveSessionControllerTask"),
+        mRootContext(rootContext),
         mAdapter(adapter),
         mSessions(sessions),
         mLogger(logger)
@@ -1011,13 +1236,14 @@ protected:
 
             // Remove the session controller from the session itself
             SessionControllerPrx controller = SessionControllerPrx::uncheckedCast(mAdapter->createProxy(
-                                                                                      mAdapter->getCommunicator()->stringToIdentity(identity)));
-            (*i)->removeSessionController(controller);
+                    mAdapter->getCommunicator()->stringToIdentity(identity)));
+            (*i)->removeSessionController(calculateOperationContext(mRootContext, identity), controller);
         }
         return true;
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     Ice::ObjectAdapterPtr mAdapter;
     SessionsTrackerPtr mSessions;
     Logger mLogger;
@@ -1026,51 +1252,60 @@ private:
 class UnplugMedia : public QueuedTask
 {
 public:
-    UnplugMedia(const BridgeImplPtr& bridge) :
+    UnplugMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("UnplugMedia"),
+        mRootContext(rootContext),
         mBridge(bridge)
-        {
-        }
+    {
+    }
 
 protected:
     bool executeImpl()
     {
-        UnplugMediaOperation op;
+        UnplugMediaOperation op(mRootContext);
         mBridge->getSessions()->visitSessions(op);
         return true;
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class SetupMedia : public QueuedTask
 {
 public:
-    SetupMedia(const BridgeImplPtr& bridge) :
+    SetupMedia(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("SetupMedia"),
+        mRootContext(rootContext),
         mBridge(bridge)
-        {
-        }
-
+    {
+    }
+    
 protected:
     bool executeImpl()
     {
-        SetupMediaOperation op;
+        SetupMediaOperation op(mRootContext);
         mBridge->getSessions()->visitSessions(op);
         return true;
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class AddToListeners : public QueuedTask
 {
 public:
-    AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
+    AddToListeners(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker,
         const BridgeCookies& cookies) :
         QueuedTask("AddToListeners"),
+        mRootContext(rootContext),
         mListeners(listeners),
         mTracker(tracker),
         mCookies(cookies)
@@ -1080,11 +1315,12 @@ public:
 protected:
     bool executeImpl()
     {
-        mListeners->sessionsAdded(mTracker->getSessions(), mCookies);
+        mListeners->sessionsAdded(mRootContext, mTracker->getSessions(), mCookies);
         return true;
     }
     
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeListenerMgrPtr mListeners;
     SessionsTrackerPtr mTracker;
     BridgeCookies mCookies;
@@ -1105,8 +1341,21 @@ protected:
     {
         if (mBridge->getSessions()->size() < 2 && mPrx)
         {
-            mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
-                            &CheckShutdown::failed));
+            try
+            {
+                mPrx->begin_shutdown(AsteriskSCF::Operations::createContext(),
+                    newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+                        &CheckShutdown::failed));
+            }
+            catch (const Ice::Exception&)
+            {
+                //
+                // This is okay.
+                //
+            }
+            //
+            // Everything else is not.
+            //
         }
         //
         // We don't care about the result really. The CheckShutdown instance will hang
@@ -1135,47 +1384,14 @@ private:
     BridgePrx mPrx;
 };
 
-template <class T>
-class GenericAMDCallback : public QueuedTask
-{
-public:
-    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
-        QueuedTask("GenericAMDCallback"),
-        mCallback(cb),
-        mTracker(tracker)
-    {
-    }
-protected:
-
-    bool executeImpl()
-    {
-        mCallback->ice_response();
-        return true;
-    }
-
-    void failImpl()
-    {
-        SessionErrorSeq errors(mTracker->getExceptions());
-        if (!errors.empty())
-        {
-            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
-        }
-        else
-        {
-            mCallback->ice_exception();
-        }
-    }
-
-private:
-    T mCallback;
-    SessionsTrackerPtr mTracker;
-};
 
 class UpdateTask : public QueuedTask
 {
 public:
-    UpdateTask(const BridgeImplPtr& bridge) :
+    UpdateTask(const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge) :
         QueuedTask("UpdateTask"),
+        mRootContext(rootContext),
         mBridge(bridge)
     {
     }
@@ -1187,16 +1403,20 @@ protected:
         return true;
     }
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
 };
 
 class ConnectTelephonyEventsTask: public QueuedTask
 {
 public:
-    ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
-                               const SessionSeq& newSessions,
-                               const Logger& logger) 
-       : QueuedTask("ConnectTelephonyEventsTask"), 
+    ConnectTelephonyEventsTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge,
+        const SessionSeq& newSessions,
+        const Logger& logger) 
+       : QueuedTask("ConnectTelephonyEventsTask"),
+         mRootContext(rootContext),
          mBridge(bridge), 
          mNewSessions(newSessions), 
          mLogger(logger) 
@@ -1227,7 +1447,7 @@ protected:
                 continue;
             }
 
-            ConnectTelephonyOperation op(telephonySession, ignoreList, mLogger);
+            ConnectTelephonyOperation op(mRootContext, telephonySession, ignoreList, mLogger);
             mBridge->getSessions()->visitSessions(op);
 
             // Since this session is now connected to all others in the bridge (including
@@ -1238,6 +1458,7 @@ protected:
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionSeq mNewSessions;
     Logger mLogger;
@@ -1246,10 +1467,13 @@ private:
 class DisconnectTelephonyEventsTask: public QueuedTask
 {
 public:
-    DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
-                               const SessionSeq& disconnectingSessions,
-                               const Logger& logger) 
-       : QueuedTask("DisconnectTelephonyEventsTask"), 
+    DisconnectTelephonyEventsTask(
+        const AsteriskSCF::System::V1::OperationContextPtr& rootContext,
+        const BridgeImplPtr& bridge,
+        const SessionSeq& disconnectingSessions,
+        const Logger& logger) 
+       : QueuedTask("DisconnectTelephonyEventsTask"),
+         mRootContext(rootContext),
          mBridge(bridge), 
          mDisconnectingSessions(disconnectingSessions), 
          mLogger(logger) 
@@ -1276,14 +1500,16 @@ protected:
                 continue;
             }
 
-            DisconnectTelephonyOperation op(telephonySession, mLogger);
+            DisconnectTelephonyOperation op(mRootContext, telephonySession, mLogger);
             mBridge->getSessions()->visitSessions(op);
         }
 
         /**
-         * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
-         * and just use a visitor. But when a session is removed, AMI operations are involved. We use an 
-         * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe. 
+         * In ConnectTelephonyEventsTask, we could assume all the sessions were
+         * in the bridge's session collection and just use a visitor. But when a
+         * session is removed, AMI operations are involved. We use an extra pass
+         * over the set of sessions being removed, and disconnect each one from
+         * the other, to be safe.
          */
         disconnectMembers();
 
@@ -1348,24 +1574,62 @@ protected:
         for(TelephonyEventSourceSeq::iterator i=fromSources.begin();   
             i != fromSources.end(); ++i)
         {
-                (*i)->removeSinks(sinksToRemove);
+            (*i)->removeSinks(calculateOperationContext(mRootContext, identityToString((*i)->ice_getIdentity())),
+                sinksToRemove);
         }
     }
 
 private:
+    AsteriskSCF::System::V1::OperationContextPtr mRootContext;
     BridgeImplPtr mBridge;
     SessionSeq mDisconnectingSessions;
     Logger mLogger;
 };
 
 
-} // End of anonymous namespace
-
-BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
-        const vector<BridgeListenerPrx>& listeners, 
-        const BridgeListenerMgrPtr& listenerMgr,
-        const ReplicatorSmartPrx& replicator,
-        const BridgeStateItemPtr& state,
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+        QueuedTask("GenericAMDCallback"),
+        mCallback(cb),
+        mTracker(tracker)
+    {
+    }
+protected:
+
+    bool executeImpl()
+    {
+        mCallback->ice_response();
+        return true;
+    }
+
+    void failImpl()
+    {
+        SessionErrorSeq errors(mTracker->getExceptions());
+        if (!errors.empty())
+        {
+            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+        }
+        else
+        {
+            mCallback->ice_exception();
+        }
+    }
+
+private:
+    T mCallback;
+    SessionsTrackerPtr mTracker;
+};
+
+} // End of anonymous namespace
+
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
+        const vector<BridgeListenerPrx>& listeners, 
+        const BridgeListenerMgrPtr& listenerMgr,
+        const ReplicatorSmartPrx& replicator,
+        const BridgeStateItemPtr& state,
         const Logger& logger) :
     mActivated(false),
     mState(state),
@@ -1375,7 +1639,8 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
     mListeners(listenerMgr),
     mReplicator(replicator),
     mSessionListener(createSessionListener(mSessions, logger)),
-    mLogger(logger)
+    mLogger(logger),
+    mOperationContextCache(AsteriskSCF::Operations::OperationContextCache::create(60))
 {
     mLogger(Trace) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
     for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin(); 
@@ -1397,20 +1662,24 @@ BridgeImpl::~BridgeImpl()
 /** 
  * Process an updated ConnectedLine record from a session. 
  */
-void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, const ConnectedLinePtr& connectedLine)
+void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, 
+    const AsteriskSCF::System::V1::OperationContextPtr& context, const ConnectedLinePtr& connectedLine)
 {
-     try
+    //
+    // TODO: Is it correct to simply ignore any exceptions that occur here?
+    //
+    try
     {
         QueuedTasks tasks;
 
         // Updates the cached ConnectedLine party id information for the given session.
         //  - Applies receive hooks on the received ConnectedLine info before caching.
-        tasks.push_back(new UpdateConnectedLineTask(this, sourceSession->getSession(), connectedLine, mLogger));
+        tasks.push_back(new UpdateConnectedLineTask(context, this, sourceSession->getSession(), connectedLine, mLogger));
 
 
         // Forwards the ConnectedLine information to the other sessions in the bridge.
         //  - Applies forwarding hooks to the cached ConnectedLine info before forwarding.
-        tasks.push_back(new ForwardConnectedLineTask(this, sourceSession->getSession(), mLogger));
+        tasks.push_back(new ForwardConnectedLineTask(context, this, sourceSession->getSession(), mLogger));
 
         ExecutorPtr runner(new Executor(tasks, mLogger));
         runner->start();
@@ -1424,7 +1693,8 @@ void BridgeImpl::updateConnectedLine(const SessionWrapperPtr& sourceSession, con
 /** 
  * Process an updated Redirections record from a session. 
  */
-void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, const RedirectionsPtr& redirections)
+void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, 
+    const AsteriskSCF::System::V1::OperationContextPtr& context, const RedirectionsPtr& redirections)
 {
     try
     {
@@ -1432,7 +1702,8 @@ void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, cons
 
         // Forwards the Redirections information to the other sessions in the bridge.
         // Applies forwarding hooks to the Redirecrting info before forwarding.
-        tasks.push_back(new ForwardRedirectionsUpdatedTask(this, sourceSession->getSession(), redirections, mLogger));
+        tasks.push_back(new ForwardRedirectionsUpdatedTask(context, this, sourceSession->getSession(),
+                redirections, mLogger));
         ExecutorPtr runner(new Executor(tasks, mLogger));
         runner->start();
     }
@@ -1442,134 +1713,163 @@ void BridgeImpl::updateRedirections(const SessionWrapperPtr& sourceSession, cons
     }
 }
 
-static SessionPrx extractSession(const SessionWithSessionInfo& swsi)
+SessionPrx extractSession(const SessionWithSessionInfo& swsi)
 {
     return swsi.sessionProxy;
 }
 
+typedef AMDContextData<AMD_Bridge_addSessionsPtr> AddSessionsContextData;
+
 void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback,
-        const SessionWithSessionInfoSeq& sessionInfos,
-        const Ice::Current&)
+    const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const SessionWithSessionInfoSeq& sessionInfos,
+    const Ice::Current&)
 {
-    try
+    AddSessionsContextData::ptr_type data(
+        getContext<AddSessionsContextData>(
+            mOperationContextCache, operationContext, callback));
+
+    if (data)
     {
-        mSessions->reap();
-        SessionSeq sessions;
-        sessions.resize(sessionInfos.size());
-        std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
-        if (sessions.empty())
+        try
         {
-            if (callback)
+            mSessions->reap();
+            SessionSeq sessions;
+            sessions.resize(sessionInfos.size());
+            std::transform(sessionInfos.begin(), sessionInfos.end(), sessions.begin(), extractSession);
+            if (sessions.empty())
             {
-                callback->ice_response();
+                if (callback)
+                {
+                    data->getProxy()->ice_response();
+                }
+                return;
+            }
+            checkSessions(sessions);
+            statePreCheck();
+            mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
+            SessionsTrackerPtr tracker(new SessionsTracker);
+            
+            QueuedTasks tasks;
+            tasks.push_back(new UnplugMedia(operationContext, this));
+            tasks.push_back(new SetBridgeTask(operationContext, mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+            tasks.push_back(new AddToListeners(operationContext, mListeners, tracker, getCookies()));
+            tasks.push_back(new SetAndGetSessionControllerTask(operationContext,
+                    mObjAdapter, mSessions, sessions, this, mLogger));
+            
+            //
+            // The new sessions being added to the bridge need to have their connected line set and
+            // forwarded to the existing sessions.
+            //
+            // TODO: This very much like the SessionOperations which are
+            // applied with the "visit" operation.
+            //
+            for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin();
+                 infoIter != sessionInfos.end(); ++infoIter)
+            {
+                if (infoIter->info && infoIter->info->sessionOwner)
+                {
+                    tasks.push_back(new UpdateConnectedLineTask(operationContext, this, infoIter->sessionProxy,
+                            new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
+                    tasks.push_back(new ForwardConnectedLineTask(operationContext, this, infoIter->sessionProxy, mLogger));
+                }
             }
-            return;
-        }
-        checkSessions(sessions);
-        statePreCheck();
-        mLogger(Trace) << FUNLOG << ": adding " << sessions.size() << " sessions";
-
-        SessionsTrackerPtr tracker(new SessionsTracker);
-        QueuedTasks tasks;
-        tasks.push_back(new UnplugMedia(this));
-        tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
-        tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
-        tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
 
-        //The new sessions being added to the bridge need to have their connected line set and
-        //forwarded to the existing sessions.
-        for (SessionWithSessionInfoSeq::const_iterator infoIter = sessionInfos.begin(); infoIter != sessionInfos.end(); ++infoIter)
-        {
-            if (infoIter->info && infoIter->info->sessionOwner)
+            //The existing sessions need to have their stored connected line forwarded to the new
+            //sessions
+            SessionSeq existingSessions = getSessions()->getSessionSeq();
+            for (SessionSeq::const_iterator sessionIter = existingSessions.begin();
+                 sessionIter != existingSessions.end(); ++sessionIter)
             {
-                tasks.push_back(new UpdateConnectedLineTask(this, infoIter->sessionProxy, new ConnectedLine(infoIter->info->sessionOwner->ids), mLogger));
-                tasks.push_back(new ForwardConnectedLineTask(this, infoIter->sessionProxy, mLogger));
+                tasks.push_back(new ForwardConnectedLineTask(operationContext, this, *sessionIter, mLogger));
             }
+            
+            tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(data->getProxy(), tracker));
+            tasks.push_back(new SetupMedia(operationContext, this));
+            tasks.push_back(new ConnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+            tasks.push_back(new UpdateTask(operationContext, this));
+            ExecutorPtr executor(new Executor(tasks, mLogger));
+            executor->start();
+            //
+            // When the operations have all completed, that last task will take care of handling
+            // the callback. It's all left withing the try/catch in the event something happens during
+            // task startup.
+            //
         }
-
-        //The existing sessions need to have their stored connected line forwarded to the new
-        //sessions
-        SessionSeq existingSessions = getSessions()->getSessionSeq();
-        for (SessionSeq::const_iterator sessionIter = existingSessions.begin(); sessionIter != existingSessions.end(); ++sessionIter)
+        catch (const std::exception& ex)
         {
-            tasks.push_back(new ForwardConnectedLineTask(this, *sessionIter, mLogger));
+            data->getProxy()->ice_exception(ex);
+        }
+        catch (...)
+        {
+            assert("If we got here, really bad things have happened!" == 0);
+            data->getProxy()->ice_exception();
         }
-
-        tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
-        tasks.push_back(new SetupMedia(this));
-        tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
-        tasks.push_back(new UpdateTask(this));
-        ExecutorPtr executor(new Executor(tasks, mLogger));
-        executor->start();
-        //
-        // When the operations have all completed, that last task will take care of handling
-        // the callback. It's all left withing the try/catch in the event something happens during
-        // task startup.
-        //
-    }
-    catch (const std::exception& ex)
-    {
-        callback->ice_exception(ex);
-    }
-    catch (...)
-    {
-        callback->ice_exception();
     }
 }
 
-void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
-        const Ice::Current&)
+typedef AMDContextData<AMD_Bridge_removeSessionsPtr> RemoveSessionsContextData;
+
+void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback,
+    const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const SessionSeq& sessions,
+    const Ice::Current&)
 {
-    try
+   RemoveSessionsContextData::ptr_type data(
+        getContext<RemoveSessionsContextData>(mOperationContextCache, operationContext, callback));
+    if (data)
     {
-        if (sessions.empty())
+        try
         {
-            callback->ice_response();
-            return;
-        }
-        checkSessions(sessions);
-        statePreCheck();
-        mSessions->reap();
+            if (sessions.empty())
+            {
+                data->getProxy()->ice_response();
+                return;
+            }
+            checkSessions(sessions);
+            statePreCheck();
+            mSessions->reap();
 
-        //
-        // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
-        // we allow them to proceed asynchronously and do not concern ourselves with the result.
-        // The logic of shutdown should remove them either because the operations succeeded or
-        // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
-        // is pointless.
-        //
-        SessionsTrackerPtr removed(new SessionsTracker);
-        for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
-        {
-            SessionWrapperPtr session = mSessions->getSession(*i);
-            if (session)
+            //
+            // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+            // we allow them to proceed asynchronously and do not concern ourselves with the result.
+            // The logic of shutdown should remove them either because the operations succeeded or
+            // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+            // is pointless.
+            //
+            SessionsTrackerPtr removed(new SessionsTracker);
+            for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
             {
-                removed->add(session->getSession());
-                mSessions->removeSession(session->getBridgedSession());
+                SessionWrapperPtr session = mSessions->getSession(*i);
+                if (session)
+                {
+                    removed->add(session->getSession());
+                    mSessions->removeSession(operationContext, session->getBridgedSession());
+                }
             }
-        }
-        QueuedTasks tasks;
+            QueuedTasks tasks;
 
-        BridgeCookies cookies;
+            BridgeCookies cookies;
+            {
+                boost::shared_lock<boost::shared_mutex> lock(mLock);
+                cookies = getCookies();
+            }
+            tasks.push_back(new RemoveSessionsNotify(operationContext, mListeners, removed, cookies));
+            tasks.push_back(new RemoveSessionControllerTask(operationContext, mObjAdapter, removed, mLogger));
+            tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(data->getProxy(), removed));
+            tasks.push_back(new DisconnectTelephonyEventsTask(operationContext, this, sessions, mLogger));
+            tasks.push_back(new CheckShutdown(this, mPrx));
+            ExecutorPtr runner(new Executor(tasks, mLogger));
+            runner->start();
+        }
+        catch (const std::exception& ex)
         {
-            boost::shared_lock<boost::shared_mutex> lock(mLock);
-            cookies = getCookies();
+            data->getProxy()->ice_exception(ex);
+        }
+        catch (...)
+        {
+            assert("If we got here, really bad things have happened!" == 0);
+            data->getProxy()->ice_exception();
         }
-        tasks.push_back(new RemoveSessionsNotify(mListeners, removed, cookies));
-        tasks.push_back(new RemoveSessionControllerTask(mObjAdapter, removed, mLogger));
-        tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
-        tasks.push_back(new DisconnectTelephonyEventsTask(this, sessions, mLogger));
-        tasks.push_back(new CheckShutdown(this, mPrx));
-        ExecutorPtr runner(new Executor(tasks, mLogger));
-        runner->start();
-    }
-    catch (const std::exception& ex)
-    {
-        callback->ice_exception(ex);
-    }
-    catch (...)
-    {
-        callback->ice_exception();
     }
 }
 
@@ -1580,83 +1880,112 @@ SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
     return mSessions->getSessionSeq();
 }
 
-void BridgeImpl::shutdown(const Ice::Current& current)
+void BridgeImpl::shutdown(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const Ice::Current& current)
 {
-    mSessions->reap();
-    //
-    // In an effort to allow some consistency with replicas, the shutdown operation is broken into
-    // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
-    //
+    ContextDataPtr data(checkAndThrow(mOperationContextCache, operationContext));
+    if (data)
+    {
+        try
+        {
+            mSessions->reap();
+            //
+            // In an effort to allow some consistency with replicas, the
+            // shutdown operation is broken into two parts. In an asynchronous
+            // version, this would probably be a couple of queued tasks.
+            //
     
-    //
-    // When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
-    // no other internal locks.
-    //
-    mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
+            //
+            // When shutting down, the bridge makes a copy of its current
+            // state and unlocks, proceeding with no other internal locks.
+            //
+            mLogger(Trace) << FUNLOG << ":" << objectIdFromCurrent(current);
 
-    BridgeStateItemPtr update;
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        if (mState->runningState == ShuttingDown)
+            BridgeStateItemPtr update;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                if (mState->runningState == ShuttingDown)
+                {
+                    mLogger(Trace) << FUNLOG << ": called when shutting down." ;
+                    return;
+                }
+                if (mState->runningState == Destroyed)
+                {
+                    mLogger(Trace) << FUNLOG << ": called when destroyed." ;
+                    throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+                }
+                mState->runningState = ShuttingDown;
+
+                mListeners->stopping(getCookies());
+                update = createUpdate();
+            }
... 6197 lines suppressed ...


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list