[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Feb 22 12:13:02 CST 2012
branch "retry_deux" has been updated
via 84187128f8ea72f78377438284ec6188b4a06489 (commit)
from d3261c499f89fd218e21f8b409ab1b7fda22a2ef (commit)
Summary of changes:
.../BasicRoutingStateReplicationIf.ice | 3 +-
src/BasicRoutingStateReplicatorApp.cpp | 4 +-
src/ConnectBridgedSessionsOperation.cpp | 9 +-
...nectBridgedSessionsWithDestinationOperation.cpp | 30 +++---
...onnectBridgedSessionsWithDestinationOperation.h | 8 +-
src/EndpointRegistry.cpp | 111 ++++++++++++++------
src/EndpointRegistry.h | 29 ++++--
src/OperationReplicaCache.h | 14 ++--
src/RouteSessionOperation.cpp | 33 ++++---
src/RouteSessionOperation.h | 8 +-
src/RoutingAdmin.cpp | 9 +-
src/RoutingAdmin.h | 4 +-
src/RoutingServiceEventPublisher.cpp | 33 ++++--
src/RoutingServiceEventPublisher.h | 19 +++-
src/RoutingStateReplicatorListener.cpp | 14 ++--
src/SessionListener.cpp | 34 ++++--
src/SessionListener.h | 23 +++-
src/SessionRouter.cpp | 27 ++++-
src/SessionRouter.h | 15 ---
src/SessionRouterOperation.cpp | 6 +-
src/SessionRouterOperation.h | 24 +++--
21 files changed, 293 insertions(+), 164 deletions(-)
- Log -----------------------------------------------------------------
commit 84187128f8ea72f78377438284ec6188b4a06489
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Feb 22 12:13:06 2012 -0600
Updates for OperationContext and duplicate call detection.
diff --git a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
index bf12701..ef00a45 100644
--- a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
@@ -90,6 +90,7 @@ module V1
class OperationStateItem extends RoutingStateItem
{
string operationId;
+ string transactionId;
};
///////////////////////////////////////////////////////////////////////
@@ -184,7 +185,7 @@ module V1
* Represents an added endpoint locator.
* The key (in the base state item) is the locator id.
*/
- class EndpointLocatorState extends RoutingStateItem
+ class EndpointLocatorState extends OperationStateItem
{
AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
AsteriskSCF::Core::Routing::V1::EndpointLocator *locator;
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 0623ec6..7697713 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -24,7 +24,7 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/Replication/StateReplicator.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "BasicRoutingStateReplicationIf.h"
using namespace std;
@@ -140,7 +140,7 @@ void setCategory(const Discovery::V1::ServiceManagementPrx& serviceManagement,
genericparams->category = category;
genericparams->service = service;
genericparams->id = id;
- serviceManagement->addLocatorParams(AsteriskSCF::createContext(), genericparams, "");
+ serviceManagement->addLocatorParams(AsteriskSCF::Operations::createContext(), genericparams, "");
}
/**
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 9e789ca..d830e87 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -59,11 +59,12 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr,
ConnectBridgedSessionsOp::OperationState>
(cb,
+ operationContext,
context,
current,
listener,
- ConnectBridgedSessionsOp::STATE_CONNECT,
- operationContext),
+ ConnectBridgedSessionsOp::STATE_CONNECT
+ ),
mSessionToReplace(sessionToReplace),
mBridgedSession(bridgedSession),
mReplaceSession(replaceSession)
@@ -137,7 +138,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
// Create a listener for the sessions not being replaced to handle early termination.
lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
mListenerManager = SessionListenerManagerPtr(
- new SessionListenerManager(mSessionContext->adapter, preserveSessions));
+ new SessionListenerManager(mOperationContext, mSessionContext->adapter, preserveSessions));
// Get the bridge for the sessions being moved.
BridgePrx oldBridge;
@@ -173,7 +174,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
// We're through listening, and we will probably interfere with the Bridge's functionality if
// we keep listening.
lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
- mListenerManager->getListener()->unregister();
+ mListenerManager->getListener()->unregister(mOperationContext);
// Now replace the sessions.
try
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 3d00d7b..df77d51 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -17,7 +17,7 @@
#include <boost/bind.hpp>
#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "BasicRoutingStateReplicationIf.h"
#include "ConnectBridgedSessionsWithDestinationOperation.h"
@@ -115,7 +115,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
// Cache the replicated items.
mReplicatedState.push_back(item);
@@ -139,7 +139,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions from the state replicator.
- mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), mReplicatedState);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
}
}
catch(...)
@@ -224,11 +224,12 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
+ operationContext,
context,
current,
listener,
- ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
- operationContext),
+ ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP
+ ),
mSessionToReplace(sessionToReplace),
mDestination(destination),
mReplaceSession(replaceSession),
@@ -272,10 +273,13 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
/**
* Constructor to service replicas.
*/
-ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context)
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
ConnectBridgedSessionsWithDestinationOp::OperationState>
- (context,
+ (operationContext,
+ context,
ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP)
{
initStateMachine();
@@ -285,9 +289,9 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
* Factory for replica objects.
*/
ConnectBridgedSessionsWithDestinationOperationPtr
- ConnectBridgedSessionsWithDestinationOperation::createReplica(const SessionContextPtr& context)
+ ConnectBridgedSessionsWithDestinationOperation::createReplica(const OperationContextPtr& operationContext, const SessionContextPtr& sessionContext)
{
- ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(context));
+ ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(operationContext, sessionContext));
return op;
}
@@ -322,7 +326,7 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
{
mSessionToReplace = item->sessionToReplace;
mDestination = item->destination;
- mOperationContext = new OperationContext(item->operationId);
+ mOperationContext = new OperationContext(item->operationId, item->transactionId);
mReplaceSession = item->replaceSession;
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
@@ -384,7 +388,7 @@ bool ConnectBridgedSessionsWithDestinationOperation::fastForwardReplica()
void ConnectBridgedSessionsWithDestinationOperation::addListenerManager()
{
- mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, this->mSessionToReplace));
+ mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, this->mSessionToReplace));
}
/**
@@ -488,7 +492,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
// We're through listening, and we will probably interfere with the Bridge's functionality if
// we keep listening.
- mListenerManager->getListener()->unregister();
+ mListenerManager->getListener()->unregister(mOperationContext);
// Modify the bridge
try
@@ -515,7 +519,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
try
{
- forwardStart(newSessions);
+ forwardStart(mOperationContext, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 4b3183a..9231401 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -91,7 +91,9 @@ public:
/**
* Factory method for replica objects.
*/
- static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(const SessionContextPtr& context);
+ static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
/**
* Update a replica object with new state information.
@@ -122,7 +124,9 @@ protected:
OperationsManager* const listener);
// Constructor to service replicas.
- ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context);
+ ConnectBridgedSessionsWithDestinationOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
private:
void initStateMachine();
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index b7dce0f..be6b387 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -19,7 +19,8 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Discovery/SmartProxy.h>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
@@ -34,6 +35,8 @@ using namespace ::AsteriskSCF::Core::Routing::V1::Event;
using namespace ::AsteriskSCF::BasicRoutingService;
using namespace ::AsteriskSCF::Replication::BasicRoutingService::V1;
using namespace ::AsteriskSCF::Discovery;
+using namespace ::AsteriskSCF::System::V1;
+using namespace ::AsteriskSCF::Operations;
namespace
{
@@ -86,7 +89,8 @@ public:
const RoutingReplicationContextPtr& replicationContext) :
mScriptProcessor(scriptProcessor),
mEventPublisher(eventPublisher),
- mReplicationContext(replicationContext)
+ mReplicationContext(replicationContext),
+ mOperationContextCache(new OperationContextCache(180))
{
}
@@ -123,8 +127,13 @@ public:
/**
* Forwards the result of processing a remove endpoint operation.
*/
- void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
+ void forwardRemoveEndpointLocator(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ Event::OperationResult result)
{
+ // Forward to event publisher.
+ mEventPublisher->removeEndpointLocatorEvent(operationContext, locatorId, result);
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -143,26 +152,27 @@ public:
removeItems.push_back(addEndpointItem);
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state removal for locator " << locatorId;
- mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), removeItems);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
}
catch(const Ice::Exception& e)
{
lg(Debug) << "EndpointRegistry unable to replicate removeEndpointLocator(): " << e.what();
}
}
-
- // Forward to event publisher.
- mEventPublisher->removeEndpointLocatorEvent(locatorId, result);
}
/**
* Forwards the result of processing a remove endpoint operation.
*/
- void forwardAddEndpointLocator(const std::string& locatorId,
+ void forwardAddEndpointLocator(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const RegExSeq& regexList,
const EndpointLocatorPrx& locator,
Event::OperationResult result)
{
+ // Forward to event publisher.
+ mEventPublisher->addEndpointLocatorEvent(operationContext, locatorId, regexList, locator, result);
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -184,7 +194,7 @@ public:
setItems.push_back(addEndpointItem);
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state update for new locator " << locatorId;
- mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
}
catch(const Ice::Exception& e)
@@ -193,15 +203,16 @@ public:
}
}
- // Forward to event publisher.
- mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, result);
}
- void forwardEndpointLocatorDestIdChange(const std::string& locatorId,
+ void forwardEndpointLocatorDestIdChange(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const RegExSeq& regexList,
const EndpointLocatorPrx& locator,
OperationResult result)
{
+ mEventPublisher->setEndpointLocatorDestinationIdsEvent(operationContext, locatorId, regexList, result);
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -220,7 +231,7 @@ public:
removeItem->key = locatorId;
removeItems.push_back(removeItem);
- mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), removeItems);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
// Now add the item with the new values.
RoutingStateItemSeq setItems;
@@ -231,7 +242,7 @@ public:
setItems.push_back(addEndpointItem);
- mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
}
}
catch(const Ice::Exception& e)
@@ -239,8 +250,6 @@ public:
lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
}
}
-
- mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
}
/**
@@ -260,6 +269,7 @@ public:
EndpointLocatorMap mEndpointLocatorMap;
const RoutingEventsPtr mEventPublisher;
RoutingReplicationContextPtr mReplicationContext;
+ OperationContextCachePtr mOperationContextCache;
};
/**
@@ -440,11 +450,21 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
* the locator being added supports.
*/
-void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const RegExSeq& regexList, const EndpointLocatorPrx& locator,
+void EndpointRegistry::addEndpointLocator(
+ const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const RegExSeq& regexList,
+ const EndpointLocatorPrx& locator,
const Ice::Current&)
{
try
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "EndpointRegistry::addEndpointLocator() detected and ignoring duplicate call for operation " << operationContext->id;
+ return;
+ }
+
lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString();
EndpointLocatorMapIterator existing;
@@ -459,12 +479,12 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
RegisteredLocator newLocator(locator, regexList);
mImpl->insertLocatorMapItem(locatorId, newLocator);
- mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::SUCCESS);
+ mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::SUCCESS);
}
catch (...)
{
lg(Error) << "Exception adding EndpointLocator.";
- mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
+ mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::FAILURE);
return;
}
}
@@ -473,10 +493,16 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
* Remove an EndpointLocator.
* @param The unique id of the locator to remove.
*/
-void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const Ice::Current&)
+void EndpointRegistry::removeEndpointLocator(const OperationContextPtr& operationContext, const std::string& locatorId, const Ice::Current&)
{
try
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "EndpointRegistry::removeEndpointLocator() detected and ignoring duplicate call for operation " << operationContext->id;
+ return;
+ }
+
lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
EndpointLocatorMapIterator existing;
@@ -485,19 +511,19 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
if (!exists)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
return;
}
mImpl->eraseLocatorMapItem(locatorId);
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::SUCCESS);
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::SUCCESS);
lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
}
catch(const std::exception &e)
{
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
lg(Error) << e.what();
}
}
@@ -508,27 +534,36 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
* @param A list of reqular expressions that define the the valid endpoint ids. This
* set of regular expressions completely replaces the current set.
*/
-void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locatorId,
- const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList, const Ice::Current&)
+void EndpointRegistry::setEndpointLocatorDestinationIds(
+ const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList,
+ const Ice::Current&)
{
try
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "EndpointRegistry::setEndpointLocatorDestinationIds() detected and ignoring duplicate call for operation " << operationContext->id;
+ return;
+ }
+
EndpointLocatorMapIterator existing;
bool exists = mImpl->locatorExists(locatorId, existing);
if (!exists)
{
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
+ mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, 0, Event::FAILURE);
throw DestinationNotFoundException(locatorId);
}
// Replace the regular expression.
existing->second.setRegEx(regExList);
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, existing->second.locator, Event::SUCCESS);
+ mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, existing->second.locator, Event::SUCCESS);
}
catch(const std::exception &e)
{
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
+ mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, 0, Event::FAILURE);
lg(Error) << "Exception modifying the destination specifications for EndpointLocator " << locatorId;
lg(Error) << " - " << e.what();
}
@@ -546,10 +581,16 @@ void EndpointRegistry::setScriptProcessor(const ScriptProcessorPtr& scriptProces
* Drop references to all EndpointLocators that have been registered.
* Note: Admin function.
*/
-void EndpointRegistry::clearEndpointLocators()
+void EndpointRegistry::clearEndpointLocators(const OperationContextPtr& operationContext)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "EndpointRegistry::clearEndpointLocators() detected and ignoring duplicate call for operation " << operationContext->id;
+ return;
+ }
+
mImpl->clearEndpointLocatorMap();
- mImpl->mEventPublisher->clearEndpointLocatorsEvent();
+ mImpl->mEventPublisher->clearEndpointLocatorsEvent(operationContext);
}
/**
@@ -558,10 +599,16 @@ void EndpointRegistry::clearEndpointLocators()
* Note: Admin function.
* @param policy A site-specific policy specification.
*/
-void EndpointRegistry::setPolicy(const std::string& policy)
+void EndpointRegistry::setPolicy(const OperationContextPtr& operationContext, const std::string& policy)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "EndpointRegistry::setPolicy() detected and ignoring duplicate call for operation " << operationContext->id;
+ return;
+ }
+
mImpl->mScriptProcessor->setPolicy(policy);
- mImpl->mEventPublisher->setPolicyEvent(policy);
+ mImpl->mEventPublisher->setPolicyEvent(operationContext, policy);
}
} // end BasicRoutingService
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index e1413c5..5d0ba55 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -53,15 +53,22 @@ public:
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
* the locator being added supports.
*/
- void addEndpointLocator(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
- const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator, const Ice::Current&);
+ void addEndpointLocator(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
+ const Ice::Current&);
/**
* Remove an EndpointLocator from the registry. The EndpointLocator must have been previously added
* via a call to addEndpointLocator.
* @param The unique id of the locator to remove.
*/
- void removeEndpointLocator(const std::string& locatorId, const Ice::Current& );
+ void removeEndpointLocator(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const Ice::Current& );
/**
@@ -70,7 +77,10 @@ public:
* @param A list of reqular expressions that define the the valid endpoint ids. This
* set of regular expressions completely replaces the current set.
*/
- void setEndpointLocatorDestinationIds(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ void setEndpointLocatorDestinationIds(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
const Ice::Current&);
// EndpointLocator overrides
@@ -79,21 +89,22 @@ public:
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
*/
- virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
-
-public:
+ void lookup_async(
+ const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb,
+ const ::std::string& destination,
+ const ::Ice::Current&);
/**
* Drop references to all EndpointLocators that have been registered.
*/
- void clearEndpointLocators();
+ void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
/**
* Sends a policy string to the script processor. The default implementation is a no-op,
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
- void setPolicy(const std::string& policy);
+ void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::string& policy);
private:
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index 740c66c..a3ba15f 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -40,7 +40,7 @@ class SessionContext;
typedef std::map<std::string, AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
/**
- * For each transaction id, we're going to cache the all the state items for the operation.
+ * For each operation id, we're going to cache the all the state items for the operation.
* The reason for this is that we don't want to rely on the order in which we will receive the state updates.
* This class is used to hold all the state updates for a given operation.
*
@@ -75,7 +75,7 @@ public:
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- // See if this transaction is in the cache.
+ // See if this operation is in the cache.
typename OpMapType::iterator i = mReplicas.find(item->operationId);
if (i == mReplicas.end())
{
@@ -92,7 +92,7 @@ public:
// If we haven't created the replica yet, do so now.
if ((*i).second.mOperation.get() == 0)
{
- boost::shared_ptr<O> workPtr(O::createReplica(mSessionContext));
+ boost::shared_ptr<O> workPtr(O::createReplica(new OperationContext(item->operationId,item->transactionId), mSessionContext));
(*i).second.mOperation = workPtr;
}
@@ -100,11 +100,11 @@ public:
(*i).second.mOperation->reflectUpdate(item);
}
- bool fetchOperation(std::string transactionId, boost::shared_ptr<O>& outRef)
+ bool fetchOperation(std::string operationId, boost::shared_ptr<O>& outRef)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- typename OpMapType::iterator i = mReplicas.find(transactionId);
+ typename OpMapType::iterator i = mReplicas.find(operationId);
if (i == mReplicas.end())
{
return false;
@@ -118,11 +118,11 @@ public:
return true;
}
- void dropOperation(std::string transactionId)
+ void dropOperation(std::string operationId)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- typename OpMapType::iterator i = mReplicas.find(transactionId);
+ typename OpMapType::iterator i = mReplicas.find(operationId);
if (i != mReplicas.end())
{
mReplicas.erase(i);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 2ebb8da..67526b1 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -16,7 +16,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
-#include <AsteriskSCF/Helpers/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Component/TestContext.h>
@@ -112,7 +112,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicationContext->getReplicator()->setState(AsteriskSCF::createContext(), setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
// Cache the replication state items.
mReplicatedState.push_back(item);
@@ -137,7 +137,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions.
- mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::createContext(), mReplicatedState);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
}
}
catch(...)
@@ -224,11 +224,12 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
RouteSessionOp::OperationState> (cb,
+ operationContext,
context,
current,
listener,
- RouteSessionOp::STATE_LOOKUP,
- operationContext),
+ RouteSessionOp::STATE_LOOKUP
+ ),
mSource(source),
mDestination(destination),
mHook(oneShotHook),
@@ -275,9 +276,11 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
/**
* Alternate constructor for replicas.
*/
-RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionContext)
- : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
- RouteSessionOp::OperationState>(sessionContext, RouteSessionOp::STATE_LOOKUP)
+RouteSessionOperation::RouteSessionOperation(
+ const OperationContextPtr& operationContext,
+ const SessionContextPtr& sessionContext)
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
+ RouteSessionOp::OperationState>(operationContext, sessionContext, RouteSessionOp::STATE_LOOKUP)
{
initStateMachine();
}
@@ -285,9 +288,11 @@ RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionCon
/**
* This is the factory method for creating a replica of a RouteSessionOperation.
*/
-RouteSessionOperationPtr RouteSessionOperation::createReplica(const SessionContextPtr& sessionContext)
+RouteSessionOperationPtr RouteSessionOperation::createReplica(
+ const OperationContextPtr& operationContext,
+ const SessionContextPtr& sessionContext)
{
- RouteSessionOperationPtr op (new RouteSessionOperation(sessionContext));
+ RouteSessionOperationPtr op (new RouteSessionOperation(operationContext, sessionContext));
return op;
}
@@ -318,7 +323,7 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
mDestination = item->destination;
mHook = item->hook;
mCallerID = item->callerID;
- mOperationContext = new OperationContext(item->operationId);
+ mOperationContext = new OperationContext(item->operationId, item->transactionId);
mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
}
@@ -393,7 +398,7 @@ RouteSessionOperation::~RouteSessionOperation()
*/
void RouteSessionOperation::addListenerManager()
{
- mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, mSource));
+ mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, mSource));
}
/**
@@ -494,7 +499,7 @@ void RouteSessionOperation::establishBridgeState()
// We're through listening, and we will probably interfere with the
// Bridge's functionality if we keep listening.
- mListenerManager->getListener()->unregister();
+ mListenerManager->getListener()->unregister(mOperationContext);
// Create the bridge
BridgePrx bridge;
@@ -522,7 +527,7 @@ void RouteSessionOperation::establishBridgeState()
try
{
- forwardStart(newSessions);
+ forwardStart(mOperationContext, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 3afd3df..a900541 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -87,7 +87,9 @@ public:
/**
* Factory method for replica objects.
*/
- static RouteSessionOperationPtr createReplica(const SessionContextPtr& context);
+ static RouteSessionOperationPtr createReplica(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
/**
* Update a replica object with new state information.
@@ -114,7 +116,9 @@ protected:
OperationsManager* const listener);
// Constructor for replicas.
- RouteSessionOperation(const SessionContextPtr& context);
+ RouteSessionOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
private:
void initStateMachine();
diff --git a/src/RoutingAdmin.cpp b/src/RoutingAdmin.cpp
index 1974426..eba84e4 100644
--- a/src/RoutingAdmin.cpp
+++ b/src/RoutingAdmin.cpp
@@ -20,6 +20,7 @@
using namespace AsteriskSCF::Core::Routing::V1;
using namespace std;
+using namespace AsteriskSCF::System::V1;
namespace AsteriskSCF
{
@@ -34,11 +35,11 @@ RoutingAdmin::RoutingAdmin(const EndpointRegistryPtr& endpointRegistry) :
/**
* Drop references to all EndpointLocators that have been registered.
*/
-void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
+void RoutingAdmin::clearEndpointLocators(const OperationContextPtr& operationContext, const Ice::Current&)
{
// For now we just forward to the registry. Some type of authentication may be required
// in the future, or perhaps the access to the interface is controlled externally.
- mEndpointRegistry->clearEndpointLocators();
+ mEndpointRegistry->clearEndpointLocators(operationContext);
}
/**
@@ -46,11 +47,11 @@ void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
-void RoutingAdmin::setPolicy(const std::string& policy, const Ice::Current&)
+void RoutingAdmin::setPolicy(const OperationContextPtr& operationContext, const std::string& policy, const Ice::Current&)
{
// For now we just forward to the registry. Some type of authentication may be required
// in the future, or perhaps the access to the interface is controlled externally.
- mEndpointRegistry->setPolicy(policy);
+ mEndpointRegistry->setPolicy(operationContext, policy);
}
} // end BasicRoutingService
diff --git a/src/RoutingAdmin.h b/src/RoutingAdmin.h
index 9cf679a..a023ec3 100644
--- a/src/RoutingAdmin.h
+++ b/src/RoutingAdmin.h
@@ -37,14 +37,14 @@ public: // RoutingServiceAdmin overrides
/**
* Drop references to all EndpointLocators that have been registered.
*/
- void clearEndpointLocators(const ::Ice::Current&);
+ void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::Ice::Current&);
/**
* Sends a policy string to the script processor. The default implementation is a no-op,
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
- void setPolicy(const ::std::string& policy, const ::Ice::Current&);
+ void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::std::string& policy, const ::Ice::Current&);
private:
EndpointRegistryPtr mEndpointRegistry;
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index eba8bef..a14f5f1 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -19,6 +19,8 @@
#include <IceStorm/IceStorm.h>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+
#include <boost/thread/mutex.hpp>
#include "RoutingServiceEventPublisher.h"
@@ -26,6 +28,7 @@ using namespace ::std;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
using namespace ::AsteriskSCF::BasicRoutingService;
+using namespace ::AsteriskSCF::System::V1;
namespace
{
@@ -152,7 +155,8 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
/**
* Send a message to the service's event topic to report a lookup event.
*/
-void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
+void RoutingServiceEventPublisher::lookupEvent(
+ const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
if (!mImpl->isInitialized())
@@ -174,7 +178,8 @@ void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
/**
* Send a message to the service's event topic to report the addEndpointLocator event.
*/
-void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::addEndpointLocatorEvent(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
@@ -187,7 +192,7 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
try
{
- mImpl->mEventTopic->addEndpointLocatorEvent(locatorId, regexList, locator, result);
+ mImpl->mEventTopic->addEndpointLocatorEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, regexList, locator, result);
}
catch(const Ice::Exception &e)
{
@@ -198,7 +203,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
/**
* Send a message to the service's event topic to report the removeEndpointLocator event.
*/
-void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
if (!mImpl->isInitialized())
@@ -208,7 +214,7 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
try
{
- mImpl->mEventTopic->removeEndpointLocatorEvent(locatorId, result);
+ mImpl->mEventTopic->removeEndpointLocatorEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, result);
}
catch(const Ice::Exception &e)
{
@@ -219,7 +225,9 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
/**
* Send a message to the service's event topic to report the setEndpointLocatorDestinationIds event.
*/
-void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(
+ const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
@@ -230,7 +238,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
try
{
- mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, result);
+ mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(AsteriskSCF::Operations::createContext(operationContext), locatorId, regexList, result);
}
catch(const Ice::Exception &e)
{
@@ -241,7 +249,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
/**
* Send a message to the service's event topic to report the clearEndpointLocators event.
*/
-void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
+void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const OperationContextPtr& operationContext,
+ const Ice::Current &)
{
if (!mImpl->isInitialized())
{
@@ -250,7 +259,7 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
try
{
- mImpl->mEventTopic->clearEndpointLocatorsEvent();
+ mImpl->mEventTopic->clearEndpointLocatorsEvent(AsteriskSCF::Operations::createContext(operationContext));
}
catch(const Ice::Exception &e)
{
@@ -261,7 +270,9 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
/**
* Send a message to the service's event topic to report the setPolicy event.
*/
-void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
+void RoutingServiceEventPublisher::setPolicyEvent(const OperationContextPtr& operationContext,
+ const std::string& policy,
+ const Ice::Current &)
{
if (!mImpl->isInitialized())
{
@@ -270,7 +281,7 @@ void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, con
try
{
- mImpl->mEventTopic->setPolicyEvent(policy);
+ mImpl->mEventTopic->setPolicyEvent(AsteriskSCF::Operations::createContext(operationContext), policy);
}
catch(const Ice::Exception &e)
{
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index 059b6ad..77158e6 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -44,7 +44,9 @@ public:
* @param destination The destination to be looked up.
* @param result Informs event listeners of the operations success or failure.
*/
- void lookupEvent(const std::string& destination, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+ void lookupEvent(
+ const std::string& destination,
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current&);
/**
@@ -53,7 +55,8 @@ public:
* @param regexList List of regex strings used to identify the destinations available by this locator.
* @param result Informs event listeners of the operations success or failure.
*/
- void addEndpointLocatorEvent(const std::string& locatorId,
+ void addEndpointLocatorEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
@@ -64,7 +67,8 @@ public:
* @param locatorId The identity of the EndpointLocator being removed.
* @param result Informs event listeners of the operations success or failure.
*/
- void removeEndpointLocatorEvent(const std::string& locatorId, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
+ void removeEndpointLocatorEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current&);
/**
@@ -73,19 +77,22 @@ public:
* @param regexList New list of regex strings to be used to identify the destinations available by this locator.
* @param result Informs event listeners of the operations success or failure.
*/
- void setEndpointLocatorDestinationIdsEvent(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ void setEndpointLocatorDestinationIdsEvent(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current&);
/**
* Send a message to the service's event topic to report the clearEndpointLocators event.
*/
- void clearEndpointLocatorsEvent(const Ice::Current&);
+ void clearEndpointLocatorsEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const Ice::Current&);
/**
* Send a message to the service's event topic to report the setPolicy event.
*/
- void setPolicyEvent(const std::string& policy, const Ice::Current&);
+ void setPolicyEvent(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::string& policy, const Ice::Current&);
private:
boost::shared_ptr<RoutingServiceEventPublisherPriv> mImpl; // pimpl idiom applied.
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index c4a27b6..59e1257 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -19,7 +19,7 @@
#include <boost/thread.hpp>
#include <AsteriskSCF/Logger.h>
-#include <AsteriskSCF/Helpers/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "RoutingStateReplicatorListener.h"
#include "OperationReplicaCache.h"
@@ -30,7 +30,7 @@ using namespace AsteriskSCF::BasicRoutingService;
using namespace AsteriskSCF::Replication::BasicRoutingService::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::V1;
-using namespace AsteriskSCF::Helpers;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -147,7 +147,7 @@ public:
void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
{
- mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+ mImpl->mEndpointRegistry->removeEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, ::Ice::Current());
}
}; // end method-local visitor def
@@ -260,7 +260,7 @@ public:
void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
{
- mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, mCurrent);
+ mImpl->mEndpointRegistry->addEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, item->regExList, item->locator, mCurrent);
}
}; // end method-local visitor def
@@ -297,7 +297,7 @@ void RoutingStateReplicatorListenerImpl::stateRemoved(
// Is this a retry for an operation we're already processing?
if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
{
- lg(Debug) << "Retry of previously processed stateRemoved() operation detected and rejected for operation " << operationContext->id;
+ lg(Debug) << "Duplicate stateRemoved() operation detected and rejected for operation " << operationContext->id;
return;
}
@@ -312,7 +312,7 @@ void RoutingStateReplicatorListenerImpl::stateRemovedForItems(
// Is this a retry for an operation we're already processing?
if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
{
- lg(Debug) << "Retry of previously processed stateRemovedForItems() operation detected and rejected for operation " << operationContext->id;
+ lg(Debug) << "Duplicate stateRemovedForItems() operation detected and rejected for operation " << operationContext->id;
return;
}
@@ -327,7 +327,7 @@ void RoutingStateReplicatorListenerImpl::stateSet(
// Is this a retry for an operation we're already processing?
if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
{
- lg(Debug) << "Retry of previously processed stateSet() operation detected and rejected for operation " << operationContext->id;
+ lg(Debug) << "Duplicate stateSet() operation detected and rejected for operation " << operationContext->id;
return;
}
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index b5736c8..26e3b39 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -18,6 +18,7 @@
#include <boost/thread/thread.hpp>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "SessionRouter.h"
#include "SessionListener.h"
@@ -26,6 +27,7 @@ using namespace AsteriskSCF;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -74,7 +76,7 @@ void SessionListenerImpl::indicated(const OperationContextPtr& operationContext,
{
if (session->ice_getIdentity() != (*s)->ice_getIdentity())
{
- (*s)->stop(stoppedIndication->response);
+ (*s)->stop(createContext(operationContext), stoppedIndication->response);
}
}
catch(const Ice::Exception &e)
@@ -94,7 +96,7 @@ void SessionListenerImpl::addSession(const SessionPrx& session)
/**
* Add a session to be tracked by this listener, and attach this listener to the session.
*/
-void SessionListenerImpl::addSessionAndListen(SessionPrx session)
+void SessionListenerImpl::addSessionAndListen(const OperationContextPtr& operationContext, SessionPrx session)
{
{ // critical scope
boost::unique_lock<boost::shared_mutex> lock(mSessionLock);
@@ -106,7 +108,7 @@ void SessionListenerImpl::addSessionAndListen(SessionPrx session)
try
{
lg(Debug) << "Adding listener to session." ;
- session->addListener(mListenerPrx);
+ session->addListener(createContext(operationContext), mListenerPrx);
}
catch(const Ice::Exception &e)
{
@@ -129,7 +131,7 @@ bool SessionListenerImpl::isTerminated() // Lots of shoring up to do for asynchr
/**
* Stop listening to all sessions we're monitoring.
*/
-void SessionListenerImpl::unregister()
+void SessionListenerImpl::unregister(const OperationContextPtr& operationContext)
{
SessionSeq sessionsToCall;
{ // critical scope
@@ -144,7 +146,7 @@ void SessionListenerImpl::unregister()
try
{
lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
- (*s)->removeListener(mListenerPrx);
+ (*s)->removeListener(createContext(operationContext), mListenerPrx);
}
catch(const Ice::Exception &e)
{
@@ -172,9 +174,13 @@ void SessionListenerImpl::setProxy(const SessionListenerPrx& prx)
/**
* Constructor for SessionListenerManager.
*/
-SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const SessionPrx& session)
+SessionListenerManager::SessionListenerManager(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::ObjectAdapterPtr& adapter,
+ const SessionPrx& session)
: mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter)
+ mAdapter(adapter),
+ mOperationContext(operationContext)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -183,7 +189,7 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
mSessionListener->setProxy(listenerProxy);
- mSessionListener->addSessionAndListen(session);
+ mSessionListener->addSessionAndListen(operationContext, session);
}
catch(...)
{
@@ -200,9 +206,13 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
/**
* Constructor for SessionListenerManager.
*/
-SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const SessionSeq& sessionSequence)
+SessionListenerManager::SessionListenerManager(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::ObjectAdapterPtr& adapter,
+ const SessionSeq& sessionSequence)
: mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter)
+ mAdapter(adapter),
+ mOperationContext(operationContext)
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -213,7 +223,7 @@ SessionListenerManager::SessionListenerManager(const Ice::ObjectAdapterPtr& adap
for(SessionSeq::const_iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
{
- mSessionListener->addSessionAndListen(*s);
+ mSessionListener->addSessionAndListen(operationContext, *s);
}
}
catch(...)
@@ -237,7 +247,7 @@ SessionListenerManager::~SessionListenerManager()
{
lg(Debug) << "About to unregister the listener..." ;
- mSessionListener->unregister();
+ mSessionListener->unregister(mOperationContext);
}
catch(const std::exception& e)
{
diff --git a/src/SessionListener.h b/src/SessionListener.h
index 42b2286..4996b3b 100644
--- a/src/SessionListener.h
+++ b/src/SessionListener.h
@@ -50,19 +50,23 @@ public: // Impl operations
/**
* Adds a session to be tracked by the listener.
*/
- void addSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+ void addSession(
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
/**
* Add a session to be tracked by this listener, and attach this listener to the session.
*/
- void addSessionAndListen(AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+ void addSessionAndListen(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+
size_t getNumSessions();
bool isTerminated();
/**
* Stop listening to all sessions we're monitoring.
*/
- void unregister();
+ void unregister(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
AsteriskSCF::SessionCommunications::V1::SessionListenerPrx getProxy();
void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& prx);
@@ -83,8 +87,15 @@ typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
class SessionListenerManager
{
public:
- SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
- SessionListenerManager(const Ice::ObjectAdapterPtr& adapter, const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
+ SessionListenerManager(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::ObjectAdapterPtr& adapter,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ SessionListenerManager(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const Ice::ObjectAdapterPtr& adapter,
+ const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
~SessionListenerManager();
SessionListenerImpl* getListener() const;
@@ -92,7 +103,7 @@ public:
private:
SessionListenerImpl* mSessionListener;
Ice::ObjectAdapterPtr mAdapter;
-
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
}; // class SessionListenerManager
typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index de69d74..79d8851 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -27,6 +27,7 @@
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
#include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "SessionRouter.h"
#include "RouteSessionOperation.h"
@@ -48,6 +49,7 @@ using namespace AsteriskSCF::SessionCommunications::ExtensionPoints::V1;
using namespace AsteriskSCF::SessionCommunications::PartyIdentification::V1;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::Threading;
+using namespace AsteriskSCF::Operations;
using namespace std;
namespace
@@ -139,6 +141,7 @@ private:
public:
SessionContextPtr mSessionContext;
+ OperationContextCachePtr mOperationContextCache;
boost::shared_ptr<AsteriskSCF::BasicRoutingService::OperationReplicaCache> mOperationReplicaCache;
OperationMap mOngoingOperations;
boost::mutex mLock;
@@ -171,15 +174,21 @@ void SessionRouter::routeSession_async(const AMD_SessionRouter_routeSessionPtr&
const RedirectionsPtr& redirects,
const ::Ice::Current& current)
{
- // Check the cache for a replica with this transaction Id.
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "Duplicate routeSession() operation detected and rejected for operation " << operationContext->id;
+ return;
+ }
+
+ // Check the replica cache for a replica with this transaction Id.
RouteSessionOperationPtr routeSessionOp;
if (mImpl->mOperationReplicaCache->getRouteSessionCache()->fetchOperation(operationContext->id, routeSessionOp))
{
routeSessionOp->rehostReplica(cb, current, mImpl.get());
WorkPtr replicaOp(routeSessionOp);
- mImpl->scheduleOperation(replicaOp);
- return;
+ mImpl->scheduleOperation(replicaOp);
+ return;
}
WorkPtr op(RouteSessionOperation::create(cb,
@@ -214,6 +223,12 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const AMD_Sessio
const SessionCreationHookPrx& oneShotHook,
const ::Ice::Current& current)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "Duplicate connectBridgedSessionsWithDestination() operation detected and rejected for operation " << operationContext->id;
+ return;
+ }
+
// Check the cache for a replica with this transaction Id.
ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
if (mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->fetchOperation(operationContext->id, connectBridgedSessionsWithDestOp))
@@ -248,6 +263,12 @@ void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCom
bool replaceSession,
const ::Ice::Current& current)
{
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ {
+ lg(Debug) << "Duplicate connectBridgedSessions() operation detected and rejected for operation " << operationContext->id;
+ return;
+ }
+
WorkPtr op(ConnectBridgedSessionsOperation::create(cb,
operationContext,
sessionToReplace,
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index 3edca53..fd70b3f 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -51,11 +51,6 @@ public:
* @param source The session initiating the routing event.
* @param destination The address or id of the destination to be routed.
*/
- void routeSession(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const std::string& destination,
- const Ice::Current&);
-
virtual void routeSession_async
(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
@@ -73,11 +68,6 @@ public:
* obtained via an accessor on this interface.
* @param destination The address or id of a destination to be used as a replacement for the specified session.
*/
- void connectBridgedSessionsWithDestination(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const ::std::string& destination,
- const Ice::Current&);
-
virtual void connectBridgedSessionsWithDestination_async
(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
@@ -95,11 +85,6 @@ public:
* obtained via an accessor on this interface.
* @param newSession The session to be used as a replacement for the specified session.
*/
- void connectBridgedSessions(const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
- const Ice::Current&);
-
virtual void connectBridgedSessions_async
(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
diff --git a/src/SessionRouterOperation.cpp b/src/SessionRouterOperation.cpp
index bb13127..49ab64a 100644
--- a/src/SessionRouterOperation.cpp
+++ b/src/SessionRouterOperation.cpp
@@ -18,6 +18,7 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Helpers/Retry.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "SessionRouterOperation.h"
@@ -26,6 +27,7 @@ using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Core::Endpoint::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -44,11 +46,11 @@ namespace BasicRoutingService
* Forward the start() operation to all sessions in a given sequence.
* Caller should catch Ice::Exception
*/
-void forwardStart(SessionSeq& sessions)
+void forwardStart(const OperationContextPtr& operationContext, SessionSeq& sessions)
{
for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s)
{
- (*s)->start(); // Caller should catch Ice::Exception
+ (*s)->start(createContext(operationContext)); // Caller should catch Ice::Exception
}
}
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index dcc732f..06bc96e 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -113,35 +113,39 @@ public:
/**
* Constructor.
* @param amdCallback The callback object to provide results to the initiator of this operation.
+ * @param operationContext Unique ID for this operation as assigned by the caller.
* @param context The SessionContext provides references to key objects needed by each operation.
* @param manager
* @param defaultState The initial state of the operation's state machine.
- * @param operationContext Unique ID for this operation as assigned by the caller.
*/
SessionRouterOperation(const T& amdCallback,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const SessionContextPtr& context,
const ::Ice::Current& current,
OperationsManager* manager,
- S defaultState,
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext)
+ S defaultState)
: mInitiatorCallback(amdCallback),
+ mOperationContext(operationContext),
mSessionContext(context),
mIceCurrent(current),
mFinished(false),
mOperationsManager(manager),
- mStateMachine(defaultState),
- mOperationContext(operationContext)
+ mStateMachine(defaultState)
{
}
/**
* Constructor for inactive replicas.
+ * @param operationContext Unique ID for this operation as assigned by the caller.
* @param context The SessionContext provides references to key objects needed by each operation.
* @param defaultState The initial state of the operation's state machine.
*/
- SessionRouterOperation(const SessionContextPtr& context,
- S defaultState)
- : mSessionContext(context),
+ SessionRouterOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context,
+ S defaultState)
+ : mOperationContext(operationContext),
+ mSessionContext(context),
mFinished(false),
mStateMachine(defaultState)
{
@@ -301,6 +305,7 @@ public:
protected:
T mInitiatorCallback;
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
SessionContextPtr mSessionContext;
::Ice::Current mIceCurrent;
@@ -308,7 +313,6 @@ protected:
SessionListenerManagerPtr mListenerManager;
OperationsManager* mOperationsManager;
AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
- AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
@@ -361,7 +365,7 @@ private:
/**
* Forward the start() operation to all sessions in a given sequence.
*/
-void forwardStart(AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+void forwardStart(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
/**
* Provide access to the bridge for a given session.
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list