[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "retry_deux" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Feb 22 12:13:02 CST 2012


branch "retry_deux" has been updated
       via  84187128f8ea72f78377438284ec6188b4a06489 (commit)
      from  d3261c499f89fd218e21f8b409ab1b7fda22a2ef (commit)

Summary of changes:
 .../BasicRoutingStateReplicationIf.ice             |    3 +-
 src/BasicRoutingStateReplicatorApp.cpp             |    4 +-
 src/ConnectBridgedSessionsOperation.cpp            |    9 +-
 ...nectBridgedSessionsWithDestinationOperation.cpp |   30 +++---
 ...onnectBridgedSessionsWithDestinationOperation.h |    8 +-
 src/EndpointRegistry.cpp                           |  111 ++++++++++++++------
 src/EndpointRegistry.h                             |   29 ++++--
 src/OperationReplicaCache.h                        |   14 ++--
 src/RouteSessionOperation.cpp                      |   33 ++++---
 src/RouteSessionOperation.h                        |    8 +-
 src/RoutingAdmin.cpp                               |    9 +-
 src/RoutingAdmin.h                                 |    4 +-
 src/RoutingServiceEventPublisher.cpp               |   33 ++++--
 src/RoutingServiceEventPublisher.h                 |   19 +++-
 src/RoutingStateReplicatorListener.cpp             |   14 ++--
 src/SessionListener.cpp                            |   34 ++++--
 src/SessionListener.h                              |   23 +++-
 src/SessionRouter.cpp                              |   27 ++++-
 src/SessionRouter.h                                |   15 ---
 src/SessionRouterOperation.cpp                     |    6 +-
 src/SessionRouterOperation.h                       |   24 +++--
 21 files changed, 293 insertions(+), 164 deletions(-)


- Log -----------------------------------------------------------------
commit 84187128f8ea72f78377438284ec6188b4a06489
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Feb 22 12:13:06 2012 -0600

    Updates for OperationContext and duplicate call detection.

diff --git a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
index bf12701..ef00a45 100644
--- a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
@@ -90,6 +90,7 @@ module V1
     class OperationStateItem extends RoutingStateItem
     {
         string operationId; 
+        string transactionId;
     };
 
     ///////////////////////////////////////////////////////////////////////
@@ -184,7 +185,7 @@ module V1
      * Represents an added endpoint locator. 
      * The key (in the base state item) is the locator id. 
      */
-    class EndpointLocatorState extends RoutingStateItem
+    class EndpointLocatorState extends OperationStateItem
     { 
         AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
         AsteriskSCF::Core::Routing::V1::EndpointLocator *locator;
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 0623ec6..7697713 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -24,7 +24,7 @@
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Logger/IceLogger.h>
 #include <AsteriskSCF/Replication/StateReplicator.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 #include "BasicRoutingStateReplicationIf.h"
 
 using namespace std;
@@ -140,7 +140,7 @@ void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement,
     genericparams->category = category;
     genericparams->service = service;
     genericparams->id = id;
-    serviceManagement->addLocatorParams(AsteriskSCF::createContext(), genericparams, "");
+    serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), genericparams, "");
 }
 
 /**
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 9e789ca..d830e87 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -59,11 +59,12 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, 
                                  ConnectBridgedSessionsOp::OperationState>
                                      (cb, 
+                                      operationContext,
                                       context, 
                                       current,
                                       listener,
-                                      ConnectBridgedSessionsOp::STATE_CONNECT,
-                                      operationContext),
+                                      ConnectBridgedSessionsOp::STATE_CONNECT
+                                      ),
            mSessionToReplace(sessionToReplace),
            mBridgedSession(bridgedSession),
            mReplaceSession(replaceSession)
@@ -137,7 +138,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
     // Create a listener for the sessions not being replaced to handle early termination.
     lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
     mListenerManager = SessionListenerManagerPtr(
-          new SessionListenerManager(mSessionContext->adapter, preserveSessions));
+          new SessionListenerManager(mOperationContext, mSessionContext->adapter, preserveSessions));
 
     // Get the bridge for the sessions being moved.
     BridgePrx oldBridge;
@@ -173,7 +174,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
     // We're through listening, and we will probably interfere with the Bridge's functionality if
     // we keep listening.
     lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
-    mListenerManager->getListener()->unregister();
+    mListenerManager->getListener()->unregister(mOperationContext);
 
     // Now replace the sessions.
     try
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 3d00d7b..df77d51 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -17,7 +17,7 @@
 #include <boost/bind.hpp>
 
 #include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
 #include "BasicRoutingStateReplicationIf.h"
 #include "ConnectBridgedSessionsWithDestinationOperation.h"
@@ -115,7 +115,7 @@ public:
         RoutingStateItemSeq setItems;
 
         setItems.push_back(item);
-        mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+        mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
         
         // Cache the replicated items.
         mReplicatedState.push_back(item);
@@ -139,7 +139,7 @@ public:
             {
                 // We just completed the entire operation. 
                 // Remove the items that represented this operation's state transitions from the state replicator.
-                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), mReplicatedState);
+                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
             }
         }
         catch(...)
@@ -224,11 +224,12 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
                           OperationsManager* const listener)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, 
                                 ConnectBridgedSessionsWithDestinationOp::OperationState>(cb, 
+                                operationContext,
                                 context, 
                                 current,
                                 listener,
-                                ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
-                                operationContext),
+                                ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP
+                                ),
         mSessionToReplace(sessionToReplace),
         mDestination(destination),
         mReplaceSession(replaceSession),
@@ -272,10 +273,13 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
 /**
  * Constructor to service replicas. 
  */
-ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context)
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(
+    const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const SessionContextPtr& context)
         : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, 
                                 ConnectBridgedSessionsWithDestinationOp::OperationState>
-                                (context, 
+                                (operationContext,
+                                 context, 
                                  ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP)
 {
     initStateMachine();
@@ -285,9 +289,9 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
  * Factory for replica objects. 
  */
 ConnectBridgedSessionsWithDestinationOperationPtr 
-      ConnectBridgedSessionsWithDestinationOperation::createReplica(const SessionContextPtr& context)
+      ConnectBridgedSessionsWithDestinationOperation::createReplica(const OperationContextPtr& operationContext, const SessionContextPtr& sessionContext)
 {
-    ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(context));
+    ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(operationContext, sessionContext));
     return op;
 }
 
@@ -322,7 +326,7 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
 {
     mSessionToReplace = item->sessionToReplace;
     mDestination = item->destination;
-    mOperationContext = new OperationContext(item->operationId);
+    mOperationContext = new OperationContext(item->operationId, item->transactionId);
     mReplaceSession = item->replaceSession;
 
     mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
@@ -384,7 +388,7 @@ bool ConnectBridgedSessionsWithDestinationOperation::fastForwardReplica()
 
 void ConnectBridgedSessionsWithDestinationOperation::addListenerManager()
 {
-    mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, this->mSessionToReplace));
+    mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, this->mSessionToReplace));
 }
 
 /**
@@ -488,7 +492,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
 
     // We're through listening, and we will probably interfere with the Bridge's functionality if
     // we keep listening.
-    mListenerManager->getListener()->unregister();
+    mListenerManager->getListener()->unregister(mOperationContext);
 
     // Modify the bridge
     try
@@ -515,7 +519,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
 
     try
     {
-        forwardStart(newSessions);
+        forwardStart(mOperationContext, newSessions);
     }
     catch (const Ice::Exception &e)
     {
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 4b3183a..9231401 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -91,7 +91,9 @@ public:
     /**
      * Factory method for replica objects. 
      */
-    static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(const SessionContextPtr& context);
+    static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionContextPtr& context);
 
     /**
      * Update a replica object with new state information.
@@ -122,7 +124,9 @@ protected:
                         OperationsManager* const listener);
 
     // Constructor to service replicas.
-    ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context);
+    ConnectBridgedSessionsWithDestinationOperation(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionContextPtr& context);
 
 private:
     void initStateMachine();
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index b7dce0f..be6b387 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -19,7 +19,8 @@
 
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "RoutingServiceEventPublisher.h"
 #include "EndpointRegistry.h"
@@ -34,6 +35,8 @@ using namespace ::AsteriskSCF::Core::Routing::V1::Event;
 using namespace ::AsteriskSCF::BasicRoutingService;
 using namespace ::AsteriskSCF::Replication::BasicRoutingService::V1;
 using namespace ::AsteriskSCF::Discovery;
+using namespace ::AsteriskSCF::System::V1;
+using namespace ::AsteriskSCF::Operations;
 
 namespace
 {
@@ -86,7 +89,8 @@ public:
                          const RoutingReplicationContextPtr& replicationContext) :
             mScriptProcessor(scriptProcessor), 
             mEventPublisher(eventPublisher),
-            mReplicationContext(replicationContext)
+            mReplicationContext(replicationContext),
+            mOperationContextCache(new OperationContextCache(180))
     {
     }
 
@@ -123,8 +127,13 @@ public:
     /**
      * Forwards the result of processing a remove endpoint operation. 
      */
-    void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
+    void forwardRemoveEndpointLocator(const OperationContextPtr& operationContext,
+        const std::string& locatorId, 
+        Event::OperationResult result)
     {
+        // Forward to event publisher.
+        mEventPublisher->removeEndpointLocatorEvent(operationContext, locatorId, result);
+
         if (!mReplicationContext->isReplicating())
         {
             return;
@@ -143,26 +152,27 @@ public:
                 removeItems.push_back(addEndpointItem);
 
                 lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state removal for locator " << locatorId;
-                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), removeItems);
+                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
             }
             catch(const Ice::Exception& e)
             {
                  lg(Debug) << "EndpointRegistry unable to replicate removeEndpointLocator(): " << e.what();
             }
         }
-
-        // Forward to event publisher.
-        mEventPublisher->removeEndpointLocatorEvent(locatorId, result);
     }
 
     /**
      * Forwards the result of processing a remove endpoint operation. 
      */
-    void forwardAddEndpointLocator(const std::string& locatorId,
+    void forwardAddEndpointLocator(const OperationContextPtr& operationContext,
+                                   const std::string& locatorId,
                                    const RegExSeq& regexList,
                                    const EndpointLocatorPrx& locator, 
                                    Event::OperationResult result)
     {
+        // Forward to event publisher.
+        mEventPublisher->addEndpointLocatorEvent(operationContext, locatorId, regexList, locator, result);
+
         if (!mReplicationContext->isReplicating())
         {
             return;
@@ -184,7 +194,7 @@ public:
                 setItems.push_back(addEndpointItem);
 
                 lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state update for new locator " << locatorId;
-                mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+                mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
 
             }
             catch(const Ice::Exception& e)
@@ -193,15 +203,16 @@ public:
             }
         }
 
-        // Forward to event publisher.
-        mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, result);
     }
 
-    void forwardEndpointLocatorDestIdChange(const std::string& locatorId,
+    void forwardEndpointLocatorDestIdChange(const OperationContextPtr& operationContext,
+                                            const std::string& locatorId,
                                             const RegExSeq& regexList, 
                                             const EndpointLocatorPrx& locator, 
                                             OperationResult result)
     {
+        mEventPublisher->setEndpointLocatorDestinationIdsEvent(operationContext, locatorId, regexList, result);
+
         if (!mReplicationContext->isReplicating())
         {
             return;
@@ -220,7 +231,7 @@ public:
                     removeItem->key = locatorId;
                     removeItems.push_back(removeItem);
 
-                    mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), removeItems);
+                    mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
 
                     // Now add the item with the new values. 
                     RoutingStateItemSeq setItems;
@@ -231,7 +242,7 @@ public:
 
                     setItems.push_back(addEndpointItem);
 
-                    mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+                    mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
                 }
             }
             catch(const Ice::Exception& e)
@@ -239,8 +250,6 @@ public:
                 lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
             }
         }
-
-        mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
     }
 
     /**
@@ -260,6 +269,7 @@ public:
     EndpointLocatorMap mEndpointLocatorMap;
     const RoutingEventsPtr mEventPublisher;
     RoutingReplicationContextPtr mReplicationContext;
+    OperationContextCachePtr mOperationContextCache;
 };
 
 /**
@@ -440,11 +450,21 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
  *   @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
  *     the locator being added supports.
  */
-void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const RegExSeq& regexList, const EndpointLocatorPrx& locator,
+void EndpointRegistry::addEndpointLocator(
+    const OperationContextPtr& operationContext,
+    const std::string& locatorId, 
+    const RegExSeq& regexList, 
+    const EndpointLocatorPrx& locator,
     const Ice::Current&)
 {
     try
     {
+        if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+        {
+            lg(Debug) << "EndpointRegistry::addEndpointLocator() detected and ignoring duplicate call for operation " << operationContext->id;
+            return;
+        }
+
         lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString();
 
         EndpointLocatorMapIterator existing;
@@ -459,12 +479,12 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
         RegisteredLocator newLocator(locator, regexList);
         mImpl->insertLocatorMapItem(locatorId, newLocator);
 
-        mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::SUCCESS);
+        mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::SUCCESS);
     }
     catch (...)
     {
         lg(Error) << "Exception adding EndpointLocator.";
-        mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
+        mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::FAILURE);
         return;
     }
 }
@@ -473,10 +493,16 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
  * Remove an EndpointLocator.
  *   @param The unique id of the locator to remove.
  */
-void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const Ice::Current&)
+void EndpointRegistry::removeEndpointLocator(const OperationContextPtr& operationContext, const std::string& locatorId, const Ice::Current&)
 {
     try
     {
+        if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+        {
+            lg(Debug) << "EndpointRegistry::removeEndpointLocator() detected and ignoring duplicate call for operation " << operationContext->id;
+            return;
+        }
+
         lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
 
         EndpointLocatorMapIterator existing;
@@ -485,19 +511,19 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
         if (!exists)
         {
             lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
-            mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
+            mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
             return;
         }
 
         mImpl->eraseLocatorMapItem(locatorId);
 
-        mImpl->forwardRemoveEndpointLocator(locatorId, Event::SUCCESS);
+        mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::SUCCESS);
 
         lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
     }
     catch(const std::exception &e)
     {
-        mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
+        mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
         lg(Error) << e.what();
     }
 }
@@ -508,27 +534,36 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
  *   @param A list of reqular expressions that define the the valid endpoint ids. This
  *     set of regular expressions completely replaces the current set.
  */
-void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locatorId,
-    const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList, const Ice::Current&)
+void EndpointRegistry::setEndpointLocatorDestinationIds(
+    const OperationContextPtr& operationContext,
+    const std::string& locatorId,
+    const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList, 
+    const Ice::Current&)
 {
     try
     {
+        if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+        {
+            lg(Debug) << "EndpointRegistry::setEndpointLocatorDestinationIds() detected and ignoring duplicate call for operation " << operationContext->id;
+            return;
+        }
+
         EndpointLocatorMapIterator existing;
         bool exists = mImpl->locatorExists(locatorId, existing);
 
         if (!exists)
         {
-            mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
+            mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, 0, Event::FAILURE);
             throw DestinationNotFoundException(locatorId);
         }
 
         // Replace the regular expression.
         existing->second.setRegEx(regExList);
-        mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, existing->second.locator, Event::SUCCESS);
+        mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, existing->second.locator, Event::SUCCESS);
     }
     catch(const std::exception &e)
     {
-        mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
+        mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, 0, Event::FAILURE);
         lg(Error) << "Exception modifying the destination specifications for EndpointLocator " << locatorId;
         lg(Error) << "   - " << e.what();
     }
@@ -546,10 +581,16 @@ void EndpointRegistry::setScriptProcessor(const ScriptProcessorPtr& scriptProces
  * Drop references to all EndpointLocators that have been registered.
  * Note: Admin function.
  */
-void EndpointRegistry::clearEndpointLocators()
+void EndpointRegistry::clearEndpointLocators(const OperationContextPtr& operationContext)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+    {
+        lg(Debug) << "EndpointRegistry::clearEndpointLocators() detected and ignoring duplicate call for operation " << operationContext->id;
+        return;
+    }
+
     mImpl->clearEndpointLocatorMap();
-    mImpl->mEventPublisher->clearEndpointLocatorsEvent();
+    mImpl->mEventPublisher->clearEndpointLocatorsEvent(operationContext);
 }
 
 /**
@@ -558,10 +599,16 @@ void EndpointRegistry::clearEndpointLocators()
  * Note: Admin function.
  *    @param policy A site-specific policy specification.
  */
-void EndpointRegistry::setPolicy(const std::string& policy)
+void EndpointRegistry::setPolicy(const OperationContextPtr& operationContext, const std::string& policy)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+    {
+        lg(Debug) << "EndpointRegistry::setPolicy() detected and ignoring duplicate call for operation " << operationContext->id;
+        return;
+    }
+
     mImpl->mScriptProcessor->setPolicy(policy);
-    mImpl->mEventPublisher->setPolicyEvent(policy);
+    mImpl->mEventPublisher->setPolicyEvent(operationContext, policy);
 }
 
 } // end BasicRoutingService
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index e1413c5..5d0ba55 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -53,15 +53,22 @@ public:
      *   @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
      *     the locator being added supports.
      */
-    void addEndpointLocator(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
-        const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator, const Ice::Current&);
+    void addEndpointLocator(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const std::string& locatorId, 
+        const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+        const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator, 
+        const Ice::Current&);
 
     /**
      * Remove an EndpointLocator from the registry. The EndpointLocator must have been previously added
      * via a call to addEndpointLocator.
      *   @param The unique id of the locator to remove.
      */
-    void removeEndpointLocator(const std::string& locatorId, const Ice::Current& );
+    void removeEndpointLocator(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const std::string& locatorId, 
+        const Ice::Current& );
 
 
     /**
@@ -70,7 +77,10 @@ public:
      *   @param A list of reqular expressions that define the the valid endpoint ids. This
      *     set of regular expressions completely replaces the current set.
      */
-    void setEndpointLocatorDestinationIds(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+    void setEndpointLocatorDestinationIds(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const std::string& locatorId, 
+        const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
         const Ice::Current&);
 
     // EndpointLocator overrides
@@ -79,21 +89,22 @@ public:
      * Returns the endpoints that match the specified destination id.
      *   @param id String identifier of the the destination.
      */
-    virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
-
-public:
+    void lookup_async(
+        const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, 
+        const ::std::string& destination, 
+        const ::Ice::Current&);
 
     /**
      * Drop references to all EndpointLocators that have been registered.
      */
-    void clearEndpointLocators();
+    void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
 
     /**
      * Sends a policy string to the script processor. The default implementation is a no-op,
      * but site-specific scripts may make use it.
      *    @param policy A site-specific policy specification.
      */
-    void setPolicy(const std::string& policy);
+    void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::string& policy);
 
 
 private:
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index 740c66c..a3ba15f 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -40,7 +40,7 @@ class SessionContext;
 typedef std::map<std::string, AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
 
 /** 
- * For each transaction id, we're going to cache the all the state items for the operation.
+ * For each operation id, we're going to cache the all the state items for the operation.
  * The reason for this is that we don't want to rely on the order in which we will receive the state updates.
  * This class is used to hold all the state updates for a given operation. 
  * 
@@ -75,7 +75,7 @@ public:
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
 
-        // See if this transaction is in the cache.
+        // See if this operation is in the cache.
         typename OpMapType::iterator i = mReplicas.find(item->operationId);
         if (i ==  mReplicas.end())
         {
@@ -92,7 +92,7 @@ public:
         // If we haven't created the replica yet, do so now. 
         if ((*i).second.mOperation.get() == 0)
         {
-            boost::shared_ptr<O> workPtr(O::createReplica(mSessionContext));
+            boost::shared_ptr<O> workPtr(O::createReplica(new OperationContext(item->operationId,item->transactionId), mSessionContext));
             (*i).second.mOperation = workPtr;
         }
 
@@ -100,11 +100,11 @@ public:
         (*i).second.mOperation->reflectUpdate(item);
     }
 
-    bool fetchOperation(std::string transactionId, boost::shared_ptr<O>& outRef)
+    bool fetchOperation(std::string operationId, boost::shared_ptr<O>& outRef)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
 
-        typename OpMapType::iterator i = mReplicas.find(transactionId);
+        typename OpMapType::iterator i = mReplicas.find(operationId);
         if (i == mReplicas.end())
         {
             return false;
@@ -118,11 +118,11 @@ public:
         return true;
     }
 
-    void dropOperation(std::string transactionId)
+    void dropOperation(std::string operationId)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
 
-        typename OpMapType::iterator i = mReplicas.find(transactionId);
+        typename OpMapType::iterator i = mReplicas.find(operationId);
         if (i != mReplicas.end())
         {
             mReplicas.erase(i);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 2ebb8da..67526b1 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -16,7 +16,7 @@
 #include <boost/shared_ptr.hpp>
 #include <boost/bind.hpp>
 
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Component/TestContext.h>
 
@@ -112,7 +112,7 @@ public:
         RoutingStateItemSeq setItems;
 
         setItems.push_back(item);
-        mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+        mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
         
         // Cache the replication state items.
         mReplicatedState.push_back(item);
@@ -137,7 +137,7 @@ public:
             {
                 // We just completed the entire operation. 
                 // Remove the items that represented this operation's state transitions.
-                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), mReplicatedState);
+                mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
             }
         }
         catch(...)
@@ -224,11 +224,12 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
                                              OperationsManager* const listener) 
         : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, 
                                   RouteSessionOp::OperationState> (cb, 
+                                                                 operationContext,
                                                                  context, 
                                                                  current,
                                                                  listener,
-                                                                 RouteSessionOp::STATE_LOOKUP,
-                                                                 operationContext),
+                                                                 RouteSessionOp::STATE_LOOKUP
+                                                                 ),
            mSource(source),
            mDestination(destination),
            mHook(oneShotHook),
@@ -275,9 +276,11 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
 /**
  * Alternate constructor for replicas. 
  */
-RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionContext) 
-                : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, 
-                  RouteSessionOp::OperationState>(sessionContext, RouteSessionOp::STATE_LOOKUP)
+RouteSessionOperation::RouteSessionOperation(
+    const OperationContextPtr& operationContext,
+    const SessionContextPtr& sessionContext) 
+        : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, 
+          RouteSessionOp::OperationState>(operationContext, sessionContext, RouteSessionOp::STATE_LOOKUP)
 {
     initStateMachine();
 }
@@ -285,9 +288,11 @@ RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionCon
 /**
  * This is the factory method for creating a replica of a RouteSessionOperation.
  */
-RouteSessionOperationPtr RouteSessionOperation::createReplica(const SessionContextPtr& sessionContext)
+RouteSessionOperationPtr RouteSessionOperation::createReplica(
+    const OperationContextPtr& operationContext,
+    const SessionContextPtr& sessionContext)
 {
-    RouteSessionOperationPtr op (new RouteSessionOperation(sessionContext));
+    RouteSessionOperationPtr op (new RouteSessionOperation(operationContext, sessionContext));
 
     return op;
 }
@@ -318,7 +323,7 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
     mDestination = item->destination;
     mHook = item->hook;
     mCallerID = item->callerID;
-    mOperationContext = new OperationContext(item->operationId);
+    mOperationContext = new OperationContext(item->operationId, item->transactionId);
 
     mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
 }
@@ -393,7 +398,7 @@ RouteSessionOperation::~RouteSessionOperation()
  */
 void RouteSessionOperation::addListenerManager()
 {
-    mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, mSource));
+    mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, mSource));
 }
 
 /**
@@ -494,7 +499,7 @@ void RouteSessionOperation::establishBridgeState()
 
     // We're through listening, and we will probably interfere with the
     // Bridge's functionality if we keep listening.
-    mListenerManager->getListener()->unregister();
+    mListenerManager->getListener()->unregister(mOperationContext);
 
     // Create the bridge
     BridgePrx bridge;
@@ -522,7 +527,7 @@ void RouteSessionOperation::establishBridgeState()
 
     try
     {
-        forwardStart(newSessions);
+        forwardStart(mOperationContext, newSessions);
     }
     catch (const Ice::Exception &e)
     {
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 3afd3df..a900541 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -87,7 +87,9 @@ public:
     /**
      * Factory method for replica objects. 
      */
-    static RouteSessionOperationPtr createReplica(const SessionContextPtr& context);
+    static RouteSessionOperationPtr createReplica(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionContextPtr& context);
 
     /**
      * Update a replica object with new state information.
@@ -114,7 +116,9 @@ protected:
                           OperationsManager* const listener);
 
    // Constructor for replicas.
-   RouteSessionOperation(const SessionContextPtr& context);
+   RouteSessionOperation(
+       const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+       const SessionContextPtr& context);
 
 private:
     void initStateMachine();
diff --git a/src/RoutingAdmin.cpp b/src/RoutingAdmin.cpp
index 1974426..eba84e4 100644
--- a/src/RoutingAdmin.cpp
+++ b/src/RoutingAdmin.cpp
@@ -20,6 +20,7 @@
 
 using namespace AsteriskSCF::Core::Routing::V1;
 using namespace std;
+using namespace AsteriskSCF::System::V1;
 
 namespace AsteriskSCF
 {
@@ -34,11 +35,11 @@ RoutingAdmin::RoutingAdmin(const EndpointRegistryPtr& endpointRegistry) :
 /**
  * Drop references to all EndpointLocators that have been registered.
  */
-void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
+void RoutingAdmin::clearEndpointLocators(const OperationContextPtr& operationContext, const Ice::Current&)
 {
     // For now we just forward to the registry. Some type of authentication may be required
     // in the future, or perhaps the access to the interface is controlled externally.
-    mEndpointRegistry->clearEndpointLocators();
+    mEndpointRegistry->clearEndpointLocators(operationContext);
 }
 
 /**
@@ -46,11 +47,11 @@ void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
  * but site-specific scripts may make use it.
  *    @param policy A site-specific policy specification.
  */
-void RoutingAdmin::setPolicy(const std::string& policy, const Ice::Current&)
+void RoutingAdmin::setPolicy(const OperationContextPtr& operationContext, const std::string& policy, const Ice::Current&)
 {
     // For now we just forward to the registry. Some type of authentication may be required
     // in the future, or perhaps the access to the interface is controlled externally.
-    mEndpointRegistry->setPolicy(policy);
+    mEndpointRegistry->setPolicy(operationContext, policy);
 }
 
 } // end BasicRoutingService
diff --git a/src/RoutingAdmin.h b/src/RoutingAdmin.h
index 9cf679a..a023ec3 100644
--- a/src/RoutingAdmin.h
+++ b/src/RoutingAdmin.h
@@ -37,14 +37,14 @@ public:  // RoutingServiceAdmin overrides
     /**
      * Drop references to all EndpointLocators that have been registered.
      */
-    void clearEndpointLocators(const ::Ice::Current&);
+    void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::Ice::Current&);
 
     /**
      * Sends a policy string to the script processor. The default implementation is a no-op,
      * but site-specific scripts may make use it.
      *    @param policy A site-specific policy specification.
      */
-    void setPolicy(const ::std::string& policy, const ::Ice::Current&);
+    void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::std::string& policy, const ::Ice::Current&);
 
 private:
     EndpointRegistryPtr mEndpointRegistry;
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index eba8bef..a14f5f1 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -19,6 +19,8 @@
 #include <IceStorm/IceStorm.h>
 
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+
 #include <boost/thread/mutex.hpp>
 #include "RoutingServiceEventPublisher.h"
 
@@ -26,6 +28,7 @@ using namespace ::std;
 using namespace ::AsteriskSCF::Core::Routing::V1;
 using namespace ::AsteriskSCF::System::Logging;
 using namespace ::AsteriskSCF::BasicRoutingService;
+using namespace ::AsteriskSCF::System::V1;
 
 namespace
 {
@@ -152,7 +155,8 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
 /**
  * Send a message to the service's event topic to report a lookup event.
  */
-void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
+void RoutingServiceEventPublisher::lookupEvent(
+    const std::string& destination,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
     if (!mImpl->isInitialized())
@@ -174,7 +178,8 @@ void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
 /**
  * Send a message to the service's event topic to report the addEndpointLocator event.
  */
-void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::addEndpointLocatorEvent(const OperationContextPtr& operationContext,
+    const std::string& locatorId,
     const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
     const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator, 
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
@@ -187,7 +192,7 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 
     try
     {
-        mImpl->mEventTopic->addEndpointLocatorEvent(locatorId, regexList, locator, result);
+        mImpl->mEventTopic->addEndpointLocatorEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, regexList, locator, result);
     }
     catch(const Ice::Exception &e)
     {
@@ -198,7 +203,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
 /**
  * Send a message to the service's event topic to report the removeEndpointLocator event.
  */
-void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const OperationContextPtr& operationContext,
+    const std::string& locatorId,
     AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
 {
     if (!mImpl->isInitialized())
@@ -208,7 +214,7 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
 
     try
     {
-        mImpl->mEventTopic->removeEndpointLocatorEvent(locatorId, result);
+        mImpl->mEventTopic->removeEndpointLocatorEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, result);
     }
     catch(const Ice::Exception &e)
     {
@@ -219,7 +225,9 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
 /**
  * Send a message to the service's event topic to report the setEndpointLocatorDestinationIds event.
  */
-void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(
+    const OperationContextPtr& operationContext,
+    const std::string& locatorId,
     const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
     const Ice::Current &)
 {
@@ -230,7 +238,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
 
     try
     {
-        mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, result);
+        mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, regexList, result);
     }
     catch(const Ice::Exception &e)
     {
@@ -241,7 +249,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
 /**
  * Send a message to the service's event topic to report the clearEndpointLocators event.
  */
-void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
+void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const OperationContextPtr& operationContext,
+    const Ice::Current &)
 {
     if (!mImpl->isInitialized())
     {
@@ -250,7 +259,7 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
 
     try
     {
-        mImpl->mEventTopic->clearEndpointLocatorsEvent();
+        mImpl->mEventTopic->clearEndpointLocatorsEvent(AsteriskSCF::Operations::createContext(operationContext));
     }
     catch(const Ice::Exception &e)
     {
@@ -261,7 +270,9 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
 /**
  * Send a message to the service's event topic to report the setPolicy event.
  */
-void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
+void RoutingServiceEventPublisher::setPolicyEvent(const OperationContextPtr& operationContext,
+    const std::string& policy, 
+    const Ice::Current &)
 {
     if (!mImpl->isInitialized())
     {
@@ -270,7 +281,7 @@ void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, con
 
     try
     {
-        mImpl->mEventTopic->setPolicyEvent(policy);
+        mImpl->mEventTopic->setPolicyEvent(AsteriskSCF::Operations::createContext(operationContext), policy);
     }
     catch(const Ice::Exception &e)
     {
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index 059b6ad..77158e6 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -44,7 +44,9 @@ public:
      *  @param destination The destination to be looked up.
      *  @param result Informs event listeners of the operations success or failure.
      */
-    void lookupEvent(const std::string& destination, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+    void lookupEvent(
+        const std::string& destination, 
+        AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
         const Ice::Current&);
 
     /**
@@ -53,7 +55,8 @@ public:
      *  @param regexList List of regex strings used to identify the destinations available by this locator.
      *  @param result Informs event listeners of the operations success or failure.
      */
-    void addEndpointLocatorEvent(const std::string& locatorId, 
+    void addEndpointLocatorEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+                                 const std::string& locatorId, 
                                  const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, 
                                  const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
                                  AsteriskSCF::Core::Routing::V1::Event::OperationResult result, 
@@ -64,7 +67,8 @@ public:
      *  @param locatorId The identity of the EndpointLocator being removed.
      *  @param result Informs event listeners of the operations success or failure.
      */
-    void removeEndpointLocatorEvent(const std::string& locatorId, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+    void removeEndpointLocatorEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const std::string& locatorId, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
         const Ice::Current&);
 
     /**
@@ -73,19 +77,22 @@ public:
      *  @param regexList New list of regex strings to be used to identify the destinations available by this locator.
      *  @param result Informs event listeners of the operations success or failure.
      */
-    void setEndpointLocatorDestinationIdsEvent(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+    void setEndpointLocatorDestinationIdsEvent(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext, 
+        const std::string& locatorId, 
+        const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
         AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current&);
 
 
     /**
      * Send a message to the service's event topic to report the clearEndpointLocators event.
      */
-    void clearEndpointLocatorsEvent(const Ice::Current&);
+    void clearEndpointLocatorsEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const Ice::Current&);
 
     /**
      * Send a message to the service's event topic to report the setPolicy event.
      */
-    void setPolicyEvent(const std::string& policy, const Ice::Current&);
+    void setPolicyEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::string& policy, const Ice::Current&);
 
 private:
     boost::shared_ptr<RoutingServiceEventPublisherPriv> mImpl; // pimpl idiom applied.
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index c4a27b6..59e1257 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -19,7 +19,7 @@
 #include <boost/thread.hpp>
 
 #include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Helpers/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "RoutingStateReplicatorListener.h"
 #include "OperationReplicaCache.h"
@@ -30,7 +30,7 @@ using namespace AsteriskSCF::BasicRoutingService;
 using namespace AsteriskSCF::Replication::BasicRoutingService::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::V1;
-using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -147,7 +147,7 @@ public:
 
             void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
             {
-                mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+                mImpl->mEndpointRegistry->removeEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, ::Ice::Current());
             }
 
         }; // end method-local visitor def
@@ -260,7 +260,7 @@ public:
 
             void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
             {
-                mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, mCurrent);
+                mImpl->mEndpointRegistry->addEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, item->regExList, item->locator, mCurrent);
             }
 
         }; // end method-local visitor def
@@ -297,7 +297,7 @@ void RoutingStateReplicatorListenerImpl::stateRemoved(
     // Is this a retry for an operation we're already processing?
     if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
     {
-        lg(Debug) << "Retry of previously processed stateRemoved() operation detected and rejected for operation " << operationContext->id;
+        lg(Debug) << "Duplicate stateRemoved() operation detected and rejected for operation " << operationContext->id;
         return;
     }
 
@@ -312,7 +312,7 @@ void RoutingStateReplicatorListenerImpl::stateRemovedForItems(
     // Is this a retry for an operation we're already processing?
     if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
     {
-        lg(Debug) << "Retry of previously processed stateRemovedForItems() operation detected and rejected for operation " << operationContext->id;
+        lg(Debug) << "Duplicate stateRemovedForItems() operation detected and rejected for operation " << operationContext->id;
         return;
     }
 
@@ -327,7 +327,7 @@ void RoutingStateReplicatorListenerImpl::stateSet(
     // Is this a retry for an operation we're already processing?
     if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
     {
-        lg(Debug) << "Retry of previously processed stateSet() operation detected and rejected for operation " << operationContext->id;
+        lg(Debug) << "Duplicate stateSet() operation detected and rejected for operation " << operationContext->id;
         return;
     }
 
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index b5736c8..26e3b39 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -18,6 +18,7 @@
 #include <boost/thread/thread.hpp>
 
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
 #include "SessionRouter.h"
 #include "SessionListener.h"
@@ -26,6 +27,7 @@ using namespace AsteriskSCF;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -74,7 +76,7 @@ void SessionListenerImpl::indicated(const OperationContextPtr& operationContext,
         {
             if (session->ice_getIdentity() != (*s)->ice_getIdentity())
             {
-                (*s)->stop(stoppedIndication->response);
+                (*s)->stop(createContext(operationContext), stoppedIndication->response);
             }
         }
         catch(const Ice::Exception &e)
@@ -94,7 +96,7 @@ void SessionListenerImpl::addSession(const SessionPrx& session)
 /**
  * Add a session to be tracked by this listener, and attach this listener to the session.
  */
-void SessionListenerImpl::addSessionAndListen(SessionPrx session)
+void SessionListenerImpl::addSessionAndListen(const OperationContextPtr& operationContext, SessionPrx session)
 {
     {   // critical scope
         boost::unique_lock<boost::shared_mutex> lock(mSessionLock);
@@ -106,7 +108,7 @@ void SessionListenerImpl::addSessionAndListen(SessionPrx session)
         try
         {
             lg(Debug) << "Adding listener to session." ;
-            session->addListener(mListenerPrx);
+            session->addListener(createContext(operationContext), mListenerPrx);
         }
         catch(const Ice::Exception &e)
         {
@@ -129,7 +131,7 @@ bool SessionListenerImpl::isTerminated() // Lots of shoring up to do for asynchr
 /**
  * Stop listening to all sessions we're monitoring.
  */
-void SessionListenerImpl::unregister()
+void SessionListenerImpl::unregister(const OperationContextPtr& operationContext)
 {
     SessionSeq sessionsToCall;
     {   // critical scope
@@ -144,7 +146,7 @@ void SessionListenerImpl::unregister()
         try
         {
             lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
-            (*s)->removeListener(mListenerPrx);
+            (*s)->removeListener(createContext(operationContext), mListenerPrx);
         }
         catch(const Ice::Exception &e)
         {
@@ -172,9 +174,13 @@ void SessionListenerImpl::setProxy(const SessionListenerPrx& prx)
 /** 
  * Constructor for SessionListenerManager.
  */
-SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const SessionPrx& session)
+SessionListenerManager::SessionListenerManager(
+    const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const Ice::ObjectAdapterPtr& adapter, 
+    const SessionPrx& session)
     : mSessionListener(new SessionListenerImpl()),
-      mAdapter(adapter)
+      mAdapter(adapter),
+      mOperationContext(operationContext)
 {
     Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
 
@@ -183,7 +189,7 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
         SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
         mSessionListener->setProxy(listenerProxy);
 
-        mSessionListener->addSessionAndListen(session);
+        mSessionListener->addSessionAndListen(operationContext, session);
     }
     catch(...)
     {
@@ -200,9 +206,13 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
 /** 
  * Constructor for SessionListenerManager.
  */
-SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const SessionSeq& sessionSequence)
+SessionListenerManager::SessionListenerManager(
+    const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+    const Ice::ObjectAdapterPtr& adapter, 
+    const SessionSeq& sessionSequence)
     : mSessionListener(new SessionListenerImpl()),
-      mAdapter(adapter)
+      mAdapter(adapter),
+      mOperationContext(operationContext)
 {
     Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
 
@@ -213,7 +223,7 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
 
         for(SessionSeq::const_iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
         {
-            mSessionListener->addSessionAndListen(*s);
+            mSessionListener->addSessionAndListen(operationContext, *s);
         }
     }
     catch(...)
@@ -237,7 +247,7 @@ SessionListenerManager::~SessionListenerManager()
     {
         lg(Debug) << "About to unregister the listener..." ;
 
-        mSessionListener->unregister();
+        mSessionListener->unregister(mOperationContext);
     }
     catch(const std::exception& e)
     {
diff --git a/src/SessionListener.h b/src/SessionListener.h
index 42b2286..4996b3b 100644
--- a/src/SessionListener.h
+++ b/src/SessionListener.h
@@ -50,19 +50,23 @@ public: // Impl operations
     /**
      * Adds a session to be tracked by the listener. 
      */
-    void addSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+    void addSession(
+        const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
 
     /**
      * Add a session to be tracked by this listener, and attach this listener to the session.
      */
-    void addSessionAndListen(AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+    void addSessionAndListen(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+
     size_t getNumSessions();
     bool isTerminated();
 
     /**
      * Stop listening to all sessions we're monitoring.
      */
-    void unregister();
+    void unregister(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
     AsteriskSCF::SessionCommunications::V1::SessionListenerPrx getProxy();
     void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& prx);
 
@@ -83,8 +87,15 @@ typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
 class SessionListenerManager
 {
 public:
-    SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
-    SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
+    SessionListenerManager(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const Ice::ObjectAdapterPtr& adapter, 
+        const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+    SessionListenerManager(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const Ice::ObjectAdapterPtr& adapter, 
+        const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
 
     ~SessionListenerManager();
     SessionListenerImpl* getListener() const;
@@ -92,7 +103,7 @@ public:
 private:
     SessionListenerImpl* mSessionListener;
     Ice::ObjectAdapterPtr mAdapter;
-
+    AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
 }; // class SessionListenerManager
 
 typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index de69d74..79d8851 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -27,6 +27,7 @@
 #include <AsteriskSCF/Core/Routing/RoutingIf.h>
 #include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
 #include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
 
 #include "SessionRouter.h"
 #include "RouteSessionOperation.h"
@@ -48,6 +49,7 @@ using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
 using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
 using namespace AsteriskSCF::Core::Routing::V1;
 using namespace AsteriskSCF::Threading;
+using namespace AsteriskSCF::Operations;
 using namespace std;
 
 namespace
@@ -139,6 +141,7 @@ private:
 
 public:
     SessionContextPtr mSessionContext;
+    OperationContextCachePtr mOperationContextCache;
     boost::shared_ptr<AsteriskSCF::BasicRoutingService::OperationReplicaCache> mOperationReplicaCache;
     OperationMap mOngoingOperations;
     boost::mutex mLock;
@@ -171,15 +174,21 @@ void SessionRouter::routeSession_async(const AMD_SessionRouter_routeSessionPtr&
                                        const RedirectionsPtr& redirects,
                                        const ::Ice::Current& current)
 {
-    // Check the cache for a replica with this transaction Id.
+    if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+    {
+        lg(Debug) << "Duplicate routeSession() operation detected and rejected for operation " << operationContext->id;
+        return;
+    }
+
+    // Check the replica cache for a replica with this transaction Id.
     RouteSessionOperationPtr routeSessionOp;
     if (mImpl->mOperationReplicaCache->getRouteSessionCache()->fetchOperation(operationContext->id, routeSessionOp))
     {
         routeSessionOp->rehostReplica(cb, current, mImpl.get());
         WorkPtr replicaOp(routeSessionOp);
 
-         mImpl->scheduleOperation(replicaOp);
-         return;
+        mImpl->scheduleOperation(replicaOp);
+        return;
     }
 
     WorkPtr op(RouteSessionOperation::create(cb, 
@@ -214,6 +223,12 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const AMD_Sessio
                                                                 const SessionCreationHookPrx& oneShotHook,
                                                                 const ::Ice::Current& current)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+    {
+        lg(Debug) << "Duplicate connectBridgedSessionsWithDestination() operation detected and rejected for operation " << operationContext->id; 
+        return;
+    }
+
     // Check the cache for a replica with this transaction Id.
     ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
     if (mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->fetchOperation(operationContext->id, connectBridgedSessionsWithDestOp))
@@ -248,6 +263,12 @@ void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCom
                                                  bool replaceSession,
                                                  const ::Ice::Current& current)
 {
+    if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+    {
+        lg(Debug) << "Duplicate connectBridgedSessions() operation detected and rejected for operation " << operationContext->id; 
+        return;
+    }
+
     WorkPtr op(ConnectBridgedSessionsOperation::create(cb, 
                                                        operationContext,
                                                        sessionToReplace, 
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index 3edca53..fd70b3f 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -51,11 +51,6 @@ public:
      *   @param source The session initiating the routing event.
      *   @param destination The address or id of the destination to be routed.
      */
-    void routeSession(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
-                      const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
-                      const std::string& destination,
-                      const Ice::Current&);
-
     virtual void routeSession_async
                    (const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
                     const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
@@ -73,11 +68,6 @@ public:
      *    obtained via an accessor on this interface.
      *   @param destination The address or id of a destination to be used as a replacement for the specified session.
      */
-    void connectBridgedSessionsWithDestination(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
-                                               const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
-                                               const ::std::string& destination,
-                                               const Ice::Current&);
-
     virtual void connectBridgedSessionsWithDestination_async
                    (const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb, 
                     const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
@@ -95,11 +85,6 @@ public:
      *    obtained via an accessor on this interface.
      *   @param newSession The session to be used as a replacement for the specified session.
      */
-    void connectBridgedSessions(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
-                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
-                                const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
-                                const Ice::Current&);
-
     virtual void connectBridgedSessions_async
                    (const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb, 
                     const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
diff --git a/src/SessionRouterOperation.cpp b/src/SessionRouterOperation.cpp
index bb13127..49ab64a 100644
--- a/src/SessionRouterOperation.cpp
+++ b/src/SessionRouterOperation.cpp
@@ -18,6 +18,7 @@
 
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
 
 #include "SessionRouterOperation.h"
 
@@ -26,6 +27,7 @@ using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Core::Endpoint::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
 
 namespace
 {
@@ -44,11 +46,11 @@ namespace BasicRoutingService
  * Forward the start() operation to all sessions in a given sequence. 
  * Caller should catch Ice::Exception
  */
-void forwardStart(SessionSeq& sessions)
+void forwardStart(const OperationContextPtr& operationContext, SessionSeq& sessions)
 {
     for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s)
     {
-        (*s)->start(); // Caller should catch Ice::Exception
+        (*s)->start(createContext(operationContext)); // Caller should catch Ice::Exception
     }
 }
 
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index dcc732f..06bc96e 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -113,35 +113,39 @@ public:
     /**
      * Constructor. 
      *  @param amdCallback The callback object to provide results to the initiator of this operation.
+     *  @param operationContext Unique ID for this operation as assigned by the caller. 
      *  @param context The SessionContext provides references to key objects needed by each operation. 
      *  @param manager 
      *  @param defaultState The initial state of the operation's state machine.
-     *  @param operationContext Unique ID for this operation as assigned by the caller. 
      */
     SessionRouterOperation(const T& amdCallback,
+                           const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
                            const SessionContextPtr& context,
                            const ::Ice::Current& current,
                            OperationsManager* manager, 
-                           S defaultState,
-                           const AsteriskSCF::System::V1::OperationContextPtr& operationContext) 
+                           S defaultState) 
         : mInitiatorCallback(amdCallback),
+          mOperationContext(operationContext),
           mSessionContext(context),
           mIceCurrent(current),
           mFinished(false),
           mOperationsManager(manager),
-          mStateMachine(defaultState),
-          mOperationContext(operationContext)
+          mStateMachine(defaultState)
     {
     }
 
     /**
      * Constructor for inactive replicas. 
+     *  @param operationContext Unique ID for this operation as assigned by the caller. 
      *  @param context The SessionContext provides references to key objects needed by each operation. 
      *  @param defaultState The initial state of the operation's state machine.
      */
-    SessionRouterOperation(const SessionContextPtr& context,
-                           S defaultState) 
-        : mSessionContext(context),
+    SessionRouterOperation(
+        const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+        const SessionContextPtr& context,
+        S defaultState) 
+        : mOperationContext(operationContext),
+          mSessionContext(context),
           mFinished(false),
           mStateMachine(defaultState)
     {
@@ -301,6 +305,7 @@ public:
 
 protected:
     T mInitiatorCallback;
+    AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
     SessionContextPtr mSessionContext;
     ::Ice::Current mIceCurrent;
 
@@ -308,7 +313,6 @@ protected:
     SessionListenerManagerPtr mListenerManager;
     OperationsManager* mOperationsManager;
     AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
-    AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
 
     AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
 
@@ -361,7 +365,7 @@ private:
 /**
  * Forward the start() operation to all sessions in a given sequence. 
  */
-void forwardStart(AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+void forwardStart(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
 
 /**
  * Provide access to the bridge for a given session. 

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/routing.git



More information about the asterisk-scf-commits mailing list