[asterisk-scf-commits] asterisk-scf/release/routing.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue May 8 17:28:45 CDT 2012
branch "master" has been updated
via ffab8c5f86120d4ca6333284cdca2ebe9518df31 (commit)
from 105cbeb20c63d5656b6cd2bb6867fa184acd8f0f (commit)
Summary of changes:
.../BasicRoutingStateReplicationIf.ice | 46 ++--
src/Component.cpp | 12 +-
src/ConnectBridgedSessionsOperation.cpp | 36 ++-
src/ConnectBridgedSessionsOperation.h | 21 +-
...nectBridgedSessionsWithDestinationOperation.cpp | 82 ++++---
...onnectBridgedSessionsWithDestinationOperation.h | 51 +++--
src/EndpointRegistry.cpp | 230 ++++++++++++++----
src/EndpointRegistry.h | 32 ++-
src/OperationReplicaCache.cpp | 1 +
src/OperationReplicaCache.h | 24 +-
src/RouteSessionOperation.cpp | 92 ++++---
src/RouteSessionOperation.h | 22 ++-
src/RoutingAdmin.cpp | 9 +-
src/RoutingAdmin.h | 4 +-
src/RoutingServiceEventPublisher.cpp | 47 +++--
src/RoutingServiceEventPublisher.h | 37 ++-
src/RoutingStateReplicatorListener.cpp | 113 ++++-----
src/RoutingStateReplicatorListener.h | 10 +-
src/SessionListener.cpp | 45 +++-
src/SessionListener.h | 22 ++-
src/SessionRouter.cpp | 88 +++++--
src/SessionRouter.h | 27 +--
src/SessionRouterOperation.cpp | 95 ++------
src/SessionRouterOperation.h | 103 ++++++---
test/CMakeLists.txt | 2 +
test/MockBridge.cpp | 46 +++--
test/MockBridge.h | 28 ++-
test/MockBridgeManager.cpp | 13 +-
test/MockBridgeManager.h | 20 +-
test/MockEndpointLocator.cpp | 6 +-
test/MockEndpointLocator.h | 4 +-
test/MockSession.cpp | 25 ++-
test/MockSession.h | 18 +-
test/MockSessionEndpoint.cpp | 12 +-
test/MockSessionEndpoint.h | 23 ++-
test/RoutingEventsListener.cpp | 91 +++++++
test/RoutingEventsListener.h | 76 ++++++
test/SharedTestData.h | 3 +
test/TestRouting.cpp | 260 ++++++++++++++++----
39 files changed, 1273 insertions(+), 603 deletions(-)
create mode 100644 test/RoutingEventsListener.cpp
create mode 100644 test/RoutingEventsListener.h
- Log -----------------------------------------------------------------
commit ffab8c5f86120d4ca6333284cdca2ebe9518df31
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue May 8 12:05:58 2012 -0500
Changes for new retry logic.
diff --git a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
index 6082325..c485818 100644
--- a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
@@ -20,6 +20,7 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.ice>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsExtensionPointsIf.ice>
+#include <AsteriskSCF/System/OperationsIf.ice>
module AsteriskSCF
{
@@ -51,6 +52,7 @@ module V1
["visitor:RoutingStateItemVisitor"] class RoutingStateItem
{
string key;
+ AsteriskSCF::System::V1::OperationContext initiatingContext;
};
sequence<RoutingStateItem> RoutingStateItemSeq;
@@ -61,9 +63,9 @@ module V1
*/
interface RoutingStateReplicatorListener
{
- void stateRemoved(Ice::StringSeq itemKeys);
- void stateRemovedForItems(RoutingStateItemSeq items);
- void stateSet(RoutingStateItemSeq items);
+ idempotent void stateRemoved(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq itemKeys);
+ idempotent void stateRemovedForItems(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
+ idempotent void stateSet(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
};
/**
@@ -71,23 +73,15 @@ module V1
*/
interface RoutingStateReplicator
{
- void addListener(RoutingStateReplicatorListener *listener);
- void removeListener(RoutingStateReplicatorListener *listener);
- void setState (RoutingStateItemSeq items);
- void removeState(Ice::StringSeq items);
- void removeStateForItems(RoutingStateItemSeq items);
+ idempotent void addListener(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateReplicatorListener *listener);
+ idempotent void removeListener(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateReplicatorListener *listener);
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+ idempotent void removeStateForItems(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
idempotent RoutingStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RoutingStateItemSeq getAllState();
};
- /**
- * All transient operations will derive from this.
- */
- class OperationStateItem extends RoutingStateItem
- {
- string operationId;
- };
-
///////////////////////////////////////////////////////////////////////
// These state items represent the state transistions
// of the RouteSession operation.
@@ -97,12 +91,13 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + RouteSessionOpStartKeyMod
*/
- class RouteSessionOpStart extends OperationStateItem
+ class RouteSessionOpStart extends RoutingStateItem
{
AsteriskSCF::SessionCommunications::V1::Session *source;
string destination;
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHook* hook;
AsteriskSCF::SessionCommunications::PartyIdentification::V1::Caller callerID;
+ AsteriskSCF::System::V1::OperationContext createBridgeContext;
};
const string RouteSessionOpStartKeyMod = ".START";
@@ -111,9 +106,11 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + RouteSessionOpWaitLookupKeyMod
*/
- class RouteSessionOpWaitLookupState extends OperationStateItem
+ class RouteSessionOpWaitLookupState extends RoutingStateItem
{
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ AsteriskSCF::System::V1::OperationContextSeq sessionCreateContexts;
+ AsteriskSCF::System::V1::OperationContextSeq startContexts;
};
const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
@@ -122,7 +119,7 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + RouteSessionOpBridgingKeyMod
*/
- class RouteSessionOpBridgingState extends OperationStateItem
+ class RouteSessionOpBridgingState extends RoutingStateItem
{
AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
};
@@ -137,11 +134,12 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + ConnectBridgedSessionsWithDestStartKeyMod
*/
- class ConnectBridgedSessionsWithDestinationOpStart extends OperationStateItem
+ class ConnectBridgedSessionsWithDestinationOpStart extends RoutingStateItem
{
AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
string destination;
bool replaceSession;
+ AsteriskSCF::System::V1::OperationContext addOrReplaceSessionContext;
};
const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
@@ -151,10 +149,12 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + ConnectBridgedSessionsWithDestWaitLookupKeyMod
*/
- class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends OperationStateItem
+ class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends RoutingStateItem
{
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
AsteriskSCF::SessionCommunications::V1::SessionSeq remainingSessions;
+ AsteriskSCF::System::V1::OperationContextSeq sessionCreateContexts;
+ AsteriskSCF::System::V1::OperationContextSeq startContexts;
};
const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
@@ -163,7 +163,7 @@ module V1
* The key (in the base state item) is the operationId of this
* operation + ConnectBridgedSessionsWithDestBridgingKeyMod
*/
- class ConnectBridgedSessionsWithDestinationOpBridgingState extends OperationStateItem
+ class ConnectBridgedSessionsWithDestinationOpBridgingState extends RoutingStateItem
{
AsteriskSCF::SessionCommunications::V1::Bridge* bridge;
};
@@ -185,7 +185,7 @@ module V1
AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
AsteriskSCF::Core::Routing::V1::EndpointLocator *locator;
};
-
+
}; //module V1
}; //module BasicRoutingService
}; //module Replication
diff --git a/src/Component.cpp b/src/Component.cpp
index fbd60a2..9afb252 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -109,7 +109,7 @@ private:
// Support objects.
RoutingServiceEventPublisherPtr mEventPublisher;
- boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
+ boost::shared_ptr<OperationReplicaCache> mSessionRouterOperationCache;
// Replication support
AsteriskSCF::Discovery::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
@@ -151,7 +151,7 @@ void Component::listenToStateReplicators()
// Are we in standby mode?
if (routingReplicationContext->getState() == STANDBY_IN_REPLICA_GROUP)
{
- routingReplicationContext->getReplicator().tryOneWay()->addListener(mReplicatorListenerProxy);
+ routingReplicationContext->getReplicator().tryOneWay()->addListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = true;
}
}
@@ -180,7 +180,7 @@ void Component::stopListeningToStateReplicators()
try
{
- routingReplicationContext->getReplicator().tryOneWay()->removeListener(mReplicatorListenerProxy);
+ routingReplicationContext->getReplicator().tryOneWay()->removeListener(AsteriskSCF::Operations::createContext(), mReplicatorListenerProxy);
mListeningToReplicator = false;
}
catch (const Ice::Exception& e)
@@ -312,10 +312,10 @@ void Component::createPrimaryServices()
routingReplicationContext));
// Create the replica cache.
- mOperationReplicaCache = OperationReplicaCachePtr(new OperationReplicaCache(mSessionContext));
+ mSessionRouterOperationCache = OperationReplicaCachePtr(new OperationReplicaCache(mSessionContext));
// Create the SessionRouter interface.
- mSessionRouter = new SessionRouter(mSessionContext, mOperationReplicaCache);
+ mSessionRouter = new SessionRouter(mSessionContext, mSessionRouterOperationCache);
mSessionRouterPrx = SessionRouterPrx::uncheckedCast(
getServiceAdapter()->add(mSessionRouter, getCommunicator()->stringToIdentity(SessionRouterObjectId)));
@@ -338,7 +338,7 @@ void Component::createReplicationStateListeners()
try
{
// Create and publish our state replicator listener interface on the backplane adapter.
- mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mOperationReplicaCache);
+ mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mSessionRouterOperationCache);
prx =
RoutingStateReplicatorListenerPrx::uncheckedCast(getBackplaneAdapter()->addWithUUID(mReplicatorListener));
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index e3955cc..725dc0a 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -17,6 +17,7 @@
#include <boost/bind.hpp>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "ConnectBridgedSessionsOperation.h"
@@ -24,11 +25,14 @@ using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
-}
+
+} // end unnamed namespace
namespace AsteriskSCF
{
@@ -47,8 +51,8 @@ namespace BasicRoutingService
*/
ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
- (const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
- const std::string& operationId,
+ (const AMDContextData<AMD_SessionRouter_connectBridgedSessionsPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
bool replaceSession,
@@ -57,12 +61,13 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr,
ConnectBridgedSessionsOp::OperationState>
- (cb,
+ (cookie,
+ operationContext,
context,
current,
listener,
- ConnectBridgedSessionsOp::STATE_CONNECT,
- operationId),
+ ConnectBridgedSessionsOp::STATE_CONNECT
+ ),
mSessionToReplace(sessionToReplace),
mBridgedSession(bridgedSession),
mReplaceSession(replaceSession)
@@ -75,8 +80,8 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
* Factory method for the operation.
*/
ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create
- (const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
- const std::string& operationId,
+ (const AMDContextData<AMD_SessionRouter_connectBridgedSessionsPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
bool replaceSession,
@@ -85,8 +90,8 @@ ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create
OperationsManager* const listener)
{
- ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cb,
- operationId,
+ ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cookie,
+ operationContext,
sessionToReplace,
bridgedSession,
replaceSession,
@@ -136,7 +141,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
// Create a listener for the sessions not being replaced to handle early termination.
lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
mListenerManager = SessionListenerManagerPtr(
- new SessionListenerManager(mSessionContext->adapter, preserveSessions));
+ new SessionListenerManager(mOperationContext, mSessionContext->adapter, preserveSessions));
// Get the bridge for the sessions being moved.
BridgePrx oldBridge;
@@ -150,7 +155,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
return;
}
- SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
+ SessionSeq migratingSessions = removeSessionsFromBridge(AsteriskSCF::Operations::createContext(mOperationContext), oldBridge, mBridgedSession);
SessionWithSessionInfoSeq infoSeq;
for (SessionSeq::iterator sessionIter = migratingSessions.begin(); sessionIter != migratingSessions.end(); ++sessionIter)
@@ -180,12 +185,15 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
if (mReplaceSession)
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Asking bridge to add sessions." ;
- mergeBridge->replaceSession(mSessionToReplace, infoSeq);
+ mergeBridge->replaceSession(
+ calculateOperationContext(mOperationContext,"replaceSessions"),
+ mSessionToReplace,
+ infoSeq);
}
else
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Asking bridge to replace sessions." ;
- mergeBridge->addSessions(infoSeq);
+ mergeBridge->addSessions(calculateOperationContext(mOperationContext,"addSessions"), infoSeq);
}
}
catch(const Ice::Exception& e)
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index 3313664..dcd8b21 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -60,8 +60,8 @@ class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskS
{
public:
static ConnectBridgedSessionsOperationPtr create(
- const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
- const std::string& operationId,
+ const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
bool replaceSession,
@@ -72,14 +72,15 @@ public:
virtual ~ConnectBridgedSessionsOperation();
protected:
- ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
- const std::string& operationId,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
- bool replaceSession,
- const Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ ConnectBridgedSessionsOperation(
+ const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ bool replaceSession,
+ const Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
private:
/**
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 03d54f6..d66a72d 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -17,6 +17,7 @@
#include <boost/bind.hpp>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "BasicRoutingStateReplicationIf.h"
#include "ConnectBridgedSessionsWithDestinationOperation.h"
@@ -29,6 +30,8 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::BasicRoutingService;
using namespace AsteriskSCF::Replication::BasicRoutingService::V1;
using namespace AsteriskSCF::StateMachine;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -49,9 +52,9 @@ class ConnectBridgedSessionsWithDestReplicatingListener :
public:
ConnectBridgedSessionsWithDestReplicatingListener(ConnectBridgedSessionsWithDestinationOperationPtr op,
const RoutingReplicationContextPtr& replication)
- : mOperationId(op->getOperationId()),
- mOperation(op),
- mReplicationContext(replication)
+ : mOperation(op),
+ mReplicationContext(replication),
+ mOperationId(mOperation->getOperationContext()->id)
{
}
@@ -77,12 +80,13 @@ public:
// Push this information to the state replicator.
ConnectBridgedSessionsWithDestinationOpStartPtr
opStart(new ConnectBridgedSessionsWithDestinationOpStart());
- opStart->operationId = mOperation->getOperationId();
- opStart->key = mOperation->getOperationId() +
+ opStart->initiatingContext = mOperation->getOperationContext();
+ opStart->key = mOperation->getOperationContext()->id +
ConnectBridgedSessionsWithDestStartKeyMod;
opStart->sessionToReplace = mOperation->getSessionToReplace();
opStart->destination = mOperation->getDestination();
opStart->replaceSession = mOperation->getReplaceSession();
+ opStart->addOrReplaceSessionContext = mOperation->getAddOrReplaceSessionContext();
pushState(opStart);
}
@@ -93,11 +97,13 @@ public:
// We've obtained a result from our AMI lookup request.
ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr
waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
- waitLookup->operationId = mOperation->getOperationId();
- waitLookup->key = mOperation->getOperationId() +
+ waitLookup->initiatingContext = mOperation->getOperationContext();
+ waitLookup->key = mOperation->getOperationContext()->id +
RouteSessionOpWaitLookupKeyMod;
waitLookup->endpoints = mOperation->getLookupResult();
waitLookup->remainingSessions = mOperation->getRemainingSessions();
+ waitLookup->sessionCreateContexts = mOperation->getCreateSessionContexts();
+ waitLookup->startContexts = mOperation->getStartSessionContexts();
pushState(waitLookup);
}
@@ -113,7 +119,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicationContext->getReplicator()->setState(setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
// Cache the replicated items.
mReplicatedState.push_back(item);
@@ -137,7 +143,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions from the state replicator.
- mReplicationContext->getReplicator()->removeStateForItems(mReplicatedState);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
}
}
catch(...)
@@ -169,8 +175,8 @@ public:
// Push this information to the state replicator.
ConnectBridgedSessionsWithDestinationOpBridgingStatePtr
bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
- bridgeOp->operationId = mOperation->getOperationId();
- bridgeOp->key = mOperation->getOperationId() +
+ bridgeOp->initiatingContext = mOperation->getOperationContext();
+ bridgeOp->key = mOperation->getOperationContext()->id +
RouteSessionOpBridgingKeyMod;
bridgeOp->bridge = mOperation->getBridge();
@@ -186,9 +192,9 @@ public:
private:
RoutingStateItemSeq mReplicatedState;
- std::string mOperationId;
ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
RoutingReplicationContextPtr mReplicationContext;
+ std::string mOperationId;
}; // end ConnectBridgedSessionsWithDestReplicatingListener
@@ -211,8 +217,8 @@ void ConnectBridgedSessionsWithDestinationOperation::initStateMachine()
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(
- const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const std::string& operationId,
+ const AMDContextData<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const SessionPrx& sessionToReplace,
const ::std::string& destination,
bool replaceSession,
@@ -221,16 +227,18 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
const SessionContextPtr& context,
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
- ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
+ ConnectBridgedSessionsWithDestinationOp::OperationState>(cookie,
+ operationContext,
context,
current,
listener,
- ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
- operationId),
+ ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP
+ ),
mSessionToReplace(sessionToReplace),
mDestination(destination),
mReplaceSession(replaceSession),
- mHook(oneShotHook)
+ mHook(oneShotHook),
+ mAddOrReplaceSessionContext(Operations::createContext(operationContext))
{
initStateMachine();
}
@@ -239,8 +247,8 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
* This is the factory method for this operation.
*/
ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::create(
- const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const std::string& operationId,
+ const AMDContextData<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const SessionPrx& sessionToReplace,
const ::std::string& destination,
bool replaceSession,
@@ -250,8 +258,9 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
OperationsManager* const listener)
{
- ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(cb,
- operationId,
+ ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(
+ cookie,
+ operationContext,
sessionToReplace,
destination,
replaceSession,
@@ -270,10 +279,13 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
/**
* Constructor to service replicas.
*/
-ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context)
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
ConnectBridgedSessionsWithDestinationOp::OperationState>
- (context,
+ (operationContext,
+ context,
ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP)
{
initStateMachine();
@@ -283,9 +295,9 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
* Factory for replica objects.
*/
ConnectBridgedSessionsWithDestinationOperationPtr
- ConnectBridgedSessionsWithDestinationOperation::createReplica(const SessionContextPtr& context)
+ ConnectBridgedSessionsWithDestinationOperation::createReplica(const OperationContextPtr& operationContext, const SessionContextPtr& sessionContext)
{
- ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(context));
+ ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(operationContext, sessionContext));
return op;
}
@@ -295,7 +307,7 @@ ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDesti
}
void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
- const OperationStateItemPtr& stateItem)
+ const RoutingStateItemPtr& stateItem)
{
ConnectBridgedSessionsWithDestinationOpStartPtr start;
ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup;
@@ -320,8 +332,9 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
{
mSessionToReplace = item->sessionToReplace;
mDestination = item->destination;
- mOperationId = item->operationId;
+ mOperationContext = item->initiatingContext;
mReplaceSession = item->replaceSession;
+ mAddOrReplaceSessionContext = item->addOrReplaceSessionContext;
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
}
@@ -330,6 +343,9 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
const ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& item)
{
mLookupResult = item->endpoints;
+ mCreateSessionContexts = item->sessionCreateContexts;
+ mStartSessionContexts = item->startContexts;
+
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
}
@@ -382,7 +398,7 @@ bool ConnectBridgedSessionsWithDestinationOperation::fastForwardReplica()
void ConnectBridgedSessionsWithDestinationOperation::addListenerManager()
{
- mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, this->mSessionToReplace));
+ mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, this->mSessionToReplace));
}
/**
@@ -461,7 +477,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
}
// Add a session
- SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mHook, mListenerManager);
+ SessionSeq newSessions = createSessionForEndpoints(mCreateSessionContexts, mLookupResult, mDestination, mHook, mListenerManager);
SessionWithSessionInfoSeq infoSeq;
for (SessionSeq::iterator sessionIter = newSessions.begin(); sessionIter != newSessions.end(); ++sessionIter)
{
@@ -494,12 +510,12 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
if (mReplaceSession)
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Replacing session with newly routed destination " << mDestination;
- mBridge->replaceSession(mSessionToReplace, infoSeq);
+ mBridge->replaceSession(mAddOrReplaceSessionContext, mSessionToReplace, infoSeq);
}
else
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Adding newly routed destination to session's bridge " << mDestination;
- mBridge->addSessions(infoSeq);
+ mBridge->addSessions(mAddOrReplaceSessionContext, infoSeq);
}
}
catch (const Ice::Exception &e)
@@ -513,7 +529,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
try
{
- forwardStart(newSessions);
+ forwardStart(mStartSessionContexts, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index b9f2fb0..0b22d6c 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -67,15 +67,16 @@ public:
/**
* Factory method for the class. This method creates an active operation.
*/
- static ConnectBridgedSessionsWithDestinationOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const std::string& operationId,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const std::string& destination,
- bool replaceSession,
- const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
- const ::Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ static ConnectBridgedSessionsWithDestinationOperationPtr create(
+ const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ bool replaceSession,
+ const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
virtual ~ConnectBridgedSessionsWithDestinationOperation();
@@ -91,12 +92,14 @@ public:
/**
* Factory method for replica objects.
*/
- static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(const SessionContextPtr& context);
+ static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
/**
* Update a replica object with new state information.
*/
- void reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr& stateItem);
+ void reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemPtr& stateItem);
/**
* Set the state machine into the highest state possible based on all of the state updates
@@ -110,19 +113,24 @@ public:
*/
AsteriskSCF::SessionCommunications::V1::SessionSeq getRemainingSessions() {return mRemainingSessions;}
+ AsteriskSCF::System::V1::OperationContextPtr getAddOrReplaceSessionContext() {return mAddOrReplaceSessionContext;}
+
protected:
- ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const std::string& operationId,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const std::string& destination,
- bool replaceSession,
- const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
- const ::Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ ConnectBridgedSessionsWithDestinationOperation(
+ const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ bool replaceSession,
+ const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
// Constructor to service replicas.
- ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context);
+ ConnectBridgedSessionsWithDestinationOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
private:
void initStateMachine();
@@ -180,6 +188,7 @@ private:
AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState> mReplicatedStates;
+ AsteriskSCF::System::V1::OperationContextPtr mAddOrReplaceSessionContext;
}; // class ConnectBridgedSessionsWithDestinationOperation
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 0f37e78..211ed4b 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -19,10 +19,12 @@
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include "RoutingServiceEventPublisher.h"
#include "EndpointRegistry.h"
-#include "RoutingServiceEventPublisher.h"
#include "ScriptProcessor.h"
using namespace ::std;
@@ -33,6 +35,8 @@ using namespace ::AsteriskSCF::Core::Routing::V1::Event;
using namespace ::AsteriskSCF::BasicRoutingService;
using namespace ::AsteriskSCF::Replication::BasicRoutingService::V1;
using namespace ::AsteriskSCF::Discovery;
+using namespace ::AsteriskSCF::System::V1;
+using namespace ::AsteriskSCF::Operations;
namespace
{
@@ -81,11 +85,12 @@ class EndpointRegistryPriv
{
public:
EndpointRegistryPriv(const ScriptProcessorPtr& scriptProcessor,
- const RoutingEventsPtr& eventPublisher,
+ const RoutingServiceEventPublisherPtr& eventPublisher,
const RoutingReplicationContextPtr& replicationContext) :
mScriptProcessor(scriptProcessor),
mEventPublisher(eventPublisher),
- mReplicationContext(replicationContext)
+ mReplicationContext(replicationContext),
+ mOperationContextCache(OperationContextCache::create(180))
{
}
@@ -122,8 +127,16 @@ public:
/**
* Forwards the result of processing a remove endpoint operation.
*/
- void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
+ void forwardRemoveEndpointLocator(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ Event::OperationResult result)
{
+ if (mReplicationContext->isActive())
+ {
+ // Forward to event publisher.
+ mEventPublisher->removeEndpointLocatorEvent(operationContext, locatorId, result);
+ }
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -139,29 +152,75 @@ public:
EndpointLocatorStatePtr addEndpointItem(new EndpointLocatorState());
addEndpointItem->key = locatorId;
+ addEndpointItem->initiatingContext = operationContext;
removeItems.push_back(addEndpointItem);
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state removal for locator " << locatorId;
- mReplicationContext->getReplicator()->removeStateForItems(removeItems);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
}
catch(const Ice::Exception& e)
{
lg(Debug) << "EndpointRegistry unable to replicate removeEndpointLocator(): " << e.what();
}
}
+ }
+
+ /**
+ * Forwards the result of processing a clearEndpointLocators() operation.
+ */
+ void forwardClearEndpointLocators(
+ const OperationContextPtr& operationContext,
+ const vector<string>& keys)
+ {
+ if (mReplicationContext->isActive())
+ {
+ // Forward to event publisher.
+ mEventPublisher->clearEndpointLocatorsEvent(operationContext);
+ }
+
+ if (!mReplicationContext->isReplicating())
+ {
+ return;
+ }
+
+ // Forward to state replicator
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq removeItems;
- // Forward to event publisher.
- mEventPublisher->removeEndpointLocatorEvent(locatorId, result);
+ vector<string>::const_iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); iter++)
+ {
+ EndpointLocatorStatePtr addEndpointItem(new EndpointLocatorState());
+ addEndpointItem->initiatingContext = operationContext;
+ addEndpointItem->key = *iter;
+ removeItems.push_back(addEndpointItem);
+ }
+ lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state removal for all locators. ";
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Debug) << "EndpointRegistry unable to replicate clearEndpointLocator(): " << e.what();
+ }
}
/**
* Forwards the result of processing a remove endpoint operation.
*/
- void forwardAddEndpointLocator(const std::string& locatorId,
+ void forwardAddEndpointLocator(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const RegExSeq& regexList,
const EndpointLocatorPrx& locator,
Event::OperationResult result)
{
+ if (mReplicationContext->isActive())
+ {
+ // Forward to event publisher.
+ mEventPublisher->addEndpointLocatorEvent(operationContext, locatorId, regexList, locator, result);
+ }
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -179,11 +238,12 @@ public:
addEndpointItem->key = locatorId;
addEndpointItem->locator = locator;
addEndpointItem->regExList = regexList;
+ addEndpointItem->initiatingContext = operationContext;
setItems.push_back(addEndpointItem);
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state update for new locator " << locatorId;
- mReplicationContext->getReplicator()->setState(setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
}
catch(const Ice::Exception& e)
@@ -192,15 +252,19 @@ public:
}
}
- // Forward to event publisher.
- mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, result);
}
- void forwardEndpointLocatorDestIdChange(const std::string& locatorId,
+ void forwardEndpointLocatorDestIdChange(const OperationContextPtr& operationContext,
+ const std::string& locatorId,
const RegExSeq& regexList,
const EndpointLocatorPrx& locator,
OperationResult result)
{
+ if (mReplicationContext->isActive())
+ {
+ mEventPublisher->setEndpointLocatorDestinationIdsEvent(operationContext, locatorId, regexList, result);
+ }
+
if (!mReplicationContext->isReplicating())
{
return;
@@ -219,7 +283,7 @@ public:
removeItem->key = locatorId;
removeItems.push_back(removeItem);
- mReplicationContext->getReplicator()->removeStateForItems(removeItems);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), removeItems);
// Now add the item with the new values.
RoutingStateItemSeq setItems;
@@ -227,19 +291,18 @@ public:
addEndpointItem->key = locatorId;
addEndpointItem->locator = locator;
addEndpointItem->regExList = regexList;
+ addEndpointItem->initiatingContext = operationContext;
setItems.push_back(addEndpointItem);
- mReplicationContext->getReplicator()->setState(setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
}
}
catch(const Ice::Exception& e)
{
- lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
+ lg(Debug) << "EndpointRegistry unable to replicate setEndpointLocatorDestinationIds(): " << e.what();
}
}
-
- mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
}
/**
@@ -257,8 +320,9 @@ public:
ScriptProcessorPtr mScriptProcessor;
EndpointLocatorMap mEndpointLocatorMap;
- const RoutingEventsPtr mEventPublisher;
+ const RoutingServiceEventPublisherPtr mEventPublisher;
RoutingReplicationContextPtr mReplicationContext;
+ OperationContextCachePtr mOperationContextCache;
};
/**
@@ -280,7 +344,7 @@ public:
*/
LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback,
const std::string& destination,
- const RoutingEventsPtr& eventPublisher,
+ const RoutingServiceEventPublisherPtr& eventPublisher,
size_t numVotes)
: mCallback(callback),
mNumVotes(numVotes),
@@ -355,7 +419,7 @@ private:
boost::mutex mLock;
AMD_EndpointLocator_lookupPtr mCallback;
size_t mNumVotes;
- RoutingEventsPtr mEventPublisher;
+ RoutingServiceEventPublisherPtr mEventPublisher;
std::string mDestination;
};
typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
@@ -364,7 +428,7 @@ typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
* Constructor.
*/
EndpointRegistry::EndpointRegistry(const ScriptProcessorPtr& scriptProcessor,
- const RoutingEventsPtr& eventPublisher,
+ const RoutingServiceEventPublisherPtr& eventPublisher,
const RoutingReplicationContextPtr& replicationContext) :
mImpl(new EndpointRegistryPriv(scriptProcessor, eventPublisher, replicationContext))
{
@@ -439,9 +503,20 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
* the locator being added supports.
*/
-void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const RegExSeq& regexList, const EndpointLocatorPrx& locator,
+void EndpointRegistry::addEndpointLocator(
+ const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const RegExSeq& regexList,
+ const EndpointLocatorPrx& locator,
const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImpl->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "EndpointRegistry::addEndpointLocator() detected retry for operation " << operationContext->id;
+ return;
+ }
+
try
{
lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString();
@@ -458,13 +533,15 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
RegisteredLocator newLocator(locator, regexList);
mImpl->insertLocatorMapItem(locatorId, newLocator);
- mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::SUCCESS);
+ mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::SUCCESS);
+ contextData->setCompleted();
}
- catch (...)
+ catch (const std::exception& e)
{
- lg(Error) << "Exception adding EndpointLocator.";
- mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
- return;
+ lg(Error) << "Exception adding EndpointLocator:" << e.what();
+ mImpl->forwardAddEndpointLocator(operationContext, locatorId, regexList, locator, Event::FAILURE);
+ contextData->setException(ExceptionWrapper::create(e));
+ throw;
}
}
@@ -472,8 +549,15 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
* Remove an EndpointLocator.
* @param The unique id of the locator to remove.
*/
-void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const Ice::Current&)
+void EndpointRegistry::removeEndpointLocator(const OperationContextPtr& operationContext, const std::string& locatorId, const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImpl->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "EndpointRegistry::removeEndpointLocator() detected retry for operation " << operationContext->id;
+ return;
+ }
+
try
{
lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
@@ -484,20 +568,23 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
if (!exists)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
+ contextData->setCompleted();
return;
}
mImpl->eraseLocatorMapItem(locatorId);
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::SUCCESS);
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::SUCCESS);
lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
+ contextData->setCompleted();
}
catch(const std::exception &e)
{
- mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
- lg(Error) << e.what();
+ mImpl->forwardRemoveEndpointLocator(operationContext, locatorId, Event::FAILURE);
+ lg(Error) << "Exception removing EndpointLocator:" << e.what();
+ contextData->setException(ExceptionWrapper::create(e));
}
}
@@ -507,9 +594,19 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
* @param A list of reqular expressions that define the the valid endpoint ids. This
* set of regular expressions completely replaces the current set.
*/
-void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locatorId,
- const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList, const Ice::Current&)
+void EndpointRegistry::setEndpointLocatorDestinationIds(
+ const OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regExList,
+ const Ice::Current&)
{
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImpl->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "EndpointRegistry::setEndpointLocatorDestinationIds() detected retry for operation " << operationContext->id;
+ return;
+ }
+
try
{
EndpointLocatorMapIterator existing;
@@ -517,19 +614,21 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
if (!exists)
{
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
throw DestinationNotFoundException(locatorId);
}
// Replace the regular expression.
existing->second.setRegEx(regExList);
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, existing->second.locator, Event::SUCCESS);
+ mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, existing->second.locator, Event::SUCCESS);
+ contextData->setCompleted();
}
catch(const std::exception &e)
{
- mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, 0, Event::FAILURE);
+ mImpl->forwardEndpointLocatorDestIdChange(operationContext, locatorId, regExList, 0, Event::FAILURE);
lg(Error) << "Exception modifying the destination specifications for EndpointLocator " << locatorId;
lg(Error) << " - " << e.what();
+ contextData->setException(ExceptionWrapper::create(e));
+ throw;
}
}
@@ -545,10 +644,33 @@ void EndpointRegistry::setScriptProcessor(const ScriptProcessorPtr& scriptProces
* Drop references to all EndpointLocators that have been registered.
* Note: Admin function.
*/
-void EndpointRegistry::clearEndpointLocators()
+void EndpointRegistry::clearEndpointLocators(const OperationContextPtr& operationContext)
{
- mImpl->clearEndpointLocatorMap();
- mImpl->mEventPublisher->clearEndpointLocatorsEvent();
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImpl->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "EndpointRegistry::clearEndpointLocators() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ // Grab the keys being removed.
+ vector<string> keys;
+ { // scope the lock
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ map<std::string, RegisteredLocator>::const_iterator iter;
+
+ for (iter = mImpl->mEndpointLocatorMap.begin(); iter != mImpl->mEndpointLocatorMap.end(); ++iter)
+ {
+ keys.push_back(iter->first);
+ }
+
+ // Since we've got the lock, clear the map now.
+ mImpl->mEndpointLocatorMap.clear();
+ }
+
+ mImpl->forwardClearEndpointLocators(operationContext, keys);
+
+ contextData->setCompleted();
}
/**
@@ -557,10 +679,32 @@ void EndpointRegistry::clearEndpointLocators()
* Note: Admin function.
* @param policy A site-specific policy specification.
*/
-void EndpointRegistry::setPolicy(const std::string& policy)
+void EndpointRegistry::setPolicy(const OperationContextPtr& operationContext, const std::string& policy)
{
- mImpl->mScriptProcessor->setPolicy(policy);
- mImpl->mEventPublisher->setPolicyEvent(policy);
+ ContextDataPtr contextData;
+ if (!(contextData = Operations::checkAndThrow(mImpl->mOperationContextCache, operationContext)))
+ {
+ lg(Debug) << "EndpointRegistry::setPolicy() detected retry for operation " << operationContext->id;
+ return;
+ }
+
+ try
+ {
+ mImpl->mScriptProcessor->setPolicy(policy);
+
+ if (mImpl->mReplicationContext->isActive())
+ {
+ mImpl->mEventPublisher->setPolicyEvent(operationContext, policy);
+ }
+
+ contextData->setCompleted();
+ }
+ catch (const std::exception& e)
+ {
+ lg(Error) << "Exception setting polocy:" << e.what();
+ contextData->setException(ExceptionWrapper::create(e));
+ throw;
+ }
}
} // end BasicRoutingService
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index e1413c5..85331b2 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -21,6 +21,7 @@
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include "BasicRoutingStateReplicationIf.h"
+#include "RoutingServiceEventPublisher.h"
#include "RoutingReplicationContext.h"
#include "ScriptProcessor.h"
@@ -35,7 +36,7 @@ class EndpointRegistry : public AsteriskSCF::Core::Routing::V1::LocatorRegistry
{
public:
EndpointRegistry(const ScriptProcessorPtr& scriptProcessor,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const RoutingServiceEventPublisherPtr& eventPublisher,
const RoutingReplicationContextPtr& replicationContext);
/**
@@ -53,15 +54,22 @@ public:
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
* the locator being added supports.
*/
- void addEndpointLocator(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
- const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator, const Ice::Current&);
+ void addEndpointLocator(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
+ const Ice::Current&);
/**
* Remove an EndpointLocator from the registry. The EndpointLocator must have been previously added
* via a call to addEndpointLocator.
* @param The unique id of the locator to remove.
*/
- void removeEndpointLocator(const std::string& locatorId, const Ice::Current& );
+ void removeEndpointLocator(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const Ice::Current& );
/**
@@ -70,7 +78,10 @@ public:
* @param A list of reqular expressions that define the the valid endpoint ids. This
* set of regular expressions completely replaces the current set.
*/
- void setEndpointLocatorDestinationIds(const std::string& locatorId, const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
+ void setEndpointLocatorDestinationIds(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
const Ice::Current&);
// EndpointLocator overrides
@@ -79,21 +90,22 @@ public:
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
*/
- virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
-
-public:
+ void lookup_async(
+ const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb,
+ const ::std::string& destination,
+ const ::Ice::Current&);
/**
* Drop references to all EndpointLocators that have been registered.
*/
- void clearEndpointLocators();
+ void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext);
/**
* Sends a policy string to the script processor. The default implementation is a no-op,
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
- void setPolicy(const std::string& policy);
+ void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const std::string& policy);
private:
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
index 56c22e6..066be7c 100644
--- a/src/OperationReplicaCache.cpp
+++ b/src/OperationReplicaCache.cpp
@@ -14,6 +14,7 @@
* at the top of the source tree.
*/
#include <boost/thread/locks.hpp>
+#include <boost/thread.hpp> // XXX for weird windows build issue, revisit
#include <AsteriskSCF/Threading/WorkQueue.h>
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index 740c66c..9e39668 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -37,10 +37,10 @@ class OperationCachePriv;
class OperationReplicaCachePriv;
class SessionContext;
-typedef std::map<std::string, AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
+typedef std::map<std::string, AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemPtr> StateItemMapType;
/**
- * For each transaction id, we're going to cache the all the state items for the operation.
+ * For each operation id, we're going to cache the all the state items for the operation.
* The reason for this is that we don't want to rely on the order in which we will receive the state updates.
* This class is used to hold all the state updates for a given operation.
*
@@ -71,19 +71,19 @@ public:
{
}
- void cacheOperationState(const AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr& item)
+ void cacheOperationState(const AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemPtr& item)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- // See if this transaction is in the cache.
- typename OpMapType::iterator i = mReplicas.find(item->operationId);
+ // See if this operation is in the cache.
+ typename OpMapType::iterator i = mReplicas.find(item->initiatingContext->id);
if (i == mReplicas.end())
{
// Add an entry to the cache.
OperationReplicaItem< boost::shared_ptr<O> > replica;
- mReplicas[item->operationId] = replica;
+ mReplicas[item->initiatingContext->id] = replica;
- i = mReplicas.find(item->operationId);
+ i = mReplicas.find(item->initiatingContext->id);
}
// Add this item to the replica's collection of state items.
@@ -92,7 +92,7 @@ public:
// If we haven't created the replica yet, do so now.
if ((*i).second.mOperation.get() == 0)
{
- boost::shared_ptr<O> workPtr(O::createReplica(mSessionContext));
+ boost::shared_ptr<O> workPtr(O::createReplica(item->initiatingContext, mSessionContext));
(*i).second.mOperation = workPtr;
}
@@ -100,11 +100,11 @@ public:
(*i).second.mOperation->reflectUpdate(item);
}
- bool fetchOperation(std::string transactionId, boost::shared_ptr<O>& outRef)
+ bool fetchOperation(std::string operationId, boost::shared_ptr<O>& outRef)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- typename OpMapType::iterator i = mReplicas.find(transactionId);
+ typename OpMapType::iterator i = mReplicas.find(operationId);
if (i == mReplicas.end())
{
return false;
@@ -118,11 +118,11 @@ public:
return true;
}
- void dropOperation(std::string transactionId)
+ void dropOperation(std::string operationId)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- typename OpMapType::iterator i = mReplicas.find(transactionId);
+ typename OpMapType::iterator i = mReplicas.find(operationId);
if (i != mReplicas.end())
{
mReplicas.erase(i);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 445ee9b..095e793 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -16,6 +16,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/Component/TestContext.h>
@@ -23,7 +24,6 @@
#include "RoutingReplicationContext.h"
#include "RouteSessionOperation.h"
-
using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -31,6 +31,8 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::BasicRoutingService;
using namespace AsteriskSCF::Replication::BasicRoutingService::V1;
using namespace AsteriskSCF::StateMachine;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::Operations;
namespace
{
@@ -52,15 +54,15 @@ class RouteSessionReplicatingListener :
public:
RouteSessionReplicatingListener(RouteSessionOperationPtr op,
const RoutingReplicationContextPtr& replicationContext)
- : mTransactionId(op->getOperationId()),
- mOperation(op),
- mReplicationContext(replicationContext)
+ : mOperation(op),
+ mReplicationContext(replicationContext),
+ mOperationId(mOperation->getOperationContext()->id)
{
}
~RouteSessionReplicatingListener()
{
- lg(Debug) << "RouteSessionReplicatingListener() being destroyed for transaction " << mTransactionId;
+ lg(Debug) << "RouteSessionReplicatingListener() being destroyed for operation context " << mOperationId;
}
/**
@@ -77,13 +79,15 @@ public:
{
case RouteSessionOp::STATE_LOOKUP:
{
- // This is the initial state. All the state of interest is what's been passed in.
+ // This is the initial state. All the state of interest is what's been passed in
+ // and our pre-computed operation contexts for outgoing calls.
// Push this information to the state replicator.
RouteSessionOpStartPtr routeSessionOpStart(new RouteSessionOpStart());
- routeSessionOpStart->operationId = mOperation->getOperationId();
- routeSessionOpStart->key = mOperation->getOperationId() + RouteSessionOpStartKeyMod;
+ routeSessionOpStart->initiatingContext = mOperation->getOperationContext();
+ routeSessionOpStart->key = mOperation->getOperationContext()->id + RouteSessionOpStartKeyMod;
routeSessionOpStart->source = mOperation->getSource();
routeSessionOpStart->destination = mOperation->getDestination();
+ routeSessionOpStart->createBridgeContext = mOperation->getCreateBridgeContext();
pushState(routeSessionOpStart);
}
@@ -93,9 +97,11 @@ public:
{
// We've obtained a result from our AMI lookup request.
RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
- routeSessionOpWaitLookup->operationId = mOperation->getOperationId();
- routeSessionOpWaitLookup->key = mOperation->getOperationId() + RouteSessionOpWaitLookupKeyMod;
+ routeSessionOpWaitLookup->initiatingContext = mOperation->getOperationContext();
+ routeSessionOpWaitLookup->key = mOperation->getOperationContext()->id + RouteSessionOpWaitLookupKeyMod;
routeSessionOpWaitLookup->endpoints = mOperation->getLookupResult();
+ routeSessionOpWaitLookup->sessionCreateContexts = mOperation->getCreateSessionContexts();
+ routeSessionOpWaitLookup->startContexts = mOperation->getStartSessionContexts();
pushState(routeSessionOpWaitLookup);
}
@@ -111,7 +117,7 @@ public:
RoutingStateItemSeq setItems;
setItems.push_back(item);
- mReplicationContext->getReplicator()->setState(setItems);
+ mReplicationContext->getReplicator()->setState(AsteriskSCF::Operations::createContext(), setItems);
// Cache the replication state items.
mReplicatedState.push_back(item);
@@ -136,7 +142,7 @@ public:
{
// We just completed the entire operation.
// Remove the items that represented this operation's state transitions.
- mReplicationContext->getReplicator()->removeStateForItems(mReplicatedState);
+ mReplicationContext->getReplicator()->removeStateForItems(AsteriskSCF::Operations::createContext(), mReplicatedState);
}
}
catch(...)
@@ -165,8 +171,8 @@ public:
// We just completed the bridge creation.
// Push this information to the state replicator.
RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
- routeSessionOpBridging->operationId = mOperation->getOperationId();
- routeSessionOpBridging->key = mOperation->getOperationId() + RouteSessionOpBridgingKeyMod;
+ routeSessionOpBridging->initiatingContext = mOperation->getOperationContext();
+ routeSessionOpBridging->key = mOperation->getOperationContext()->id + RouteSessionOpBridgingKeyMod;
routeSessionOpBridging->bridge = mOperation->getBridge();
pushState(routeSessionOpBridging);
@@ -184,9 +190,9 @@ public:
private:
RoutingStateItemSeq mReplicatedState;
- std::string mTransactionId;
RouteSessionOperationPtr mOperation;
RoutingReplicationContextPtr mReplicationContext;
+ std::string mOperationId;
};
/**
@@ -211,8 +217,8 @@ void RouteSessionOperation::initStateMachine()
* This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
*/
-RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
- const std::string& operationId,
+RouteSessionOperation::RouteSessionOperation(const AMDContextData<AMD_SessionRouter_routeSessionPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
@@ -222,17 +228,19 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
const SessionContextPtr& context,
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
- RouteSessionOp::OperationState> (cb,
+ RouteSessionOp::OperationState> (cookie,
+ operationContext,
context,
current,
listener,
- RouteSessionOp::STATE_LOOKUP,
- operationId),
+ RouteSessionOp::STATE_LOOKUP
+ ),
mSource(source),
mDestination(destination),
mHook(oneShotHook),
mCallerID(callerID),
- mRedirects(redirects)
+ mRedirects(redirects),
+ mCreateBridgeContext(Operations::createContext(operationContext))
{
initStateMachine();
}
@@ -240,8 +248,8 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
/**
* This is the factory method for RouteSessionOperation.
*/
-RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_routeSessionPtr& cb,
- const std::string& operationId,
+RouteSessionOperationPtr RouteSessionOperation::create(const AMDContextData<AMD_SessionRouter_routeSessionPtr>::ptr_type& cookie,
+ const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
@@ -252,8 +260,8 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
OperationsManager* const listener)
{
- RouteSessionOperationPtr op (new RouteSessionOperation(cb,
- operationId,
+ RouteSessionOperationPtr op (new RouteSessionOperation(cookie,
+ operationContext,
source,
destination,
oneShotHook,
@@ -274,9 +282,11 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
/**
* Alternate constructor for replicas.
*/
-RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionContext)
- : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
- RouteSessionOp::OperationState>(sessionContext, RouteSessionOp::STATE_LOOKUP)
+RouteSessionOperation::RouteSessionOperation(
+ const OperationContextPtr& operationContext,
+ const SessionContextPtr& sessionContext)
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
+ RouteSessionOp::OperationState>(operationContext, sessionContext, RouteSessionOp::STATE_LOOKUP)
{
initStateMachine();
}
@@ -284,14 +294,16 @@ RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionCon
/**
* This is the factory method for creating a replica of a RouteSessionOperation.
*/
-RouteSessionOperationPtr RouteSessionOperation::createReplica(const SessionContextPtr& sessionContext)
+RouteSessionOperationPtr RouteSessionOperation::createReplica(
+ const OperationContextPtr& operationContext,
+ const SessionContextPtr& sessionContext)
{
- RouteSessionOperationPtr op (new RouteSessionOperation(sessionContext));
+ RouteSessionOperationPtr op (new RouteSessionOperation(operationContext, sessionContext));
return op;
}
-void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr& stateItem)
+void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemPtr& stateItem)
{
RouteSessionOpStartPtr start;
RouteSessionOpWaitLookupStatePtr waitLookup;
@@ -317,7 +329,8 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
mDestination = item->destination;
mHook = item->hook;
mCallerID = item->callerID;
- mOperationId = item->operationId;
+ mOperationContext = item->initiatingContext;
+ mCreateBridgeContext = item->createBridgeContext;
mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
}
@@ -325,6 +338,8 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& item)
{
mLookupResult = item->endpoints;
+ mCreateSessionContexts = item->sessionCreateContexts;
+ mStartSessionContexts = item->startContexts;
mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
}
@@ -364,7 +379,7 @@ bool RouteSessionOperation::fastForwardReplica()
addListenerManager();
- lg(Debug) << "Fast-forwarded replica of RouteSessionOperation to STATE_BRIDGING for " << mOperationId;
+ lg(Debug) << "Fast-forwarded replica of RouteSessionOperation to STATE_BRIDGING for " << mOperationContext->id;
mStateMachine.resetStartState(RouteSessionOp::STATE_BRIDGING);
@@ -376,7 +391,8 @@ bool RouteSessionOperation::fastForwardReplica()
}
// Apparently nothing left to do but reply to the AMD callback.
- lg(Debug) << "Fast-forwarded replica of RouteSessionOperation to STATE_SEND_RESPONSE for " << mOperationId;
+ lg(Debug) << "Fast-forwarded replica of RouteSessionOperation to STATE_SEND_RESPONSE for " << mOperationContext->id;
+
mStateMachine.resetStartState(RouteSessionOp::STATE_SEND_RESPONSE);
return true;
}
@@ -391,7 +407,7 @@ RouteSessionOperation::~RouteSessionOperation()
*/
void RouteSessionOperation::addListenerManager()
{
- mListenerManager.reset(new SessionListenerManager(mSessionContext->adapter, mSource));
+ mListenerManager.reset(new SessionListenerManager(mOperationContext, mSessionContext->adapter, mSource));
}
/**
@@ -476,7 +492,7 @@ void RouteSessionOperation::establishBridgeState()
}
// Add a session to the endpoints.
- SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mHook, mListenerManager);
+ SessionSeq newSessions = createSessionForEndpoints(mCreateSessionContexts, mLookupResult, mDestination, mHook, mListenerManager);
if (mListenerManager->getListener()->getNumSessions() < 2)
{
@@ -505,7 +521,7 @@ void RouteSessionOperation::establishBridgeState()
bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Creating bridge.";
- bridge = mSessionContext->bridgeManager->createBridge(mSource, bridgedSessions, 0, mCallerID, mRedirects);
+ bridge = mSessionContext->bridgeManager->createBridge(mCreateBridgeContext, mSource, bridgedSessions, 0, mCallerID, mRedirects);
}
catch (const Ice::Exception &e)
{
@@ -520,7 +536,7 @@ void RouteSessionOperation::establishBridgeState()
try
{
- forwardStart(newSessions);
+ forwardStart(mStartSessionContexts, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 6a725cb..8ca1da9 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -18,6 +18,7 @@
#include <boost/function.hpp>
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include "BasicRoutingStateReplicationIf.h"
@@ -65,8 +66,8 @@ public:
/**
* Factory method for the class. This method creates an active routing operation.
*/
- static RouteSessionOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
- const std::string& operationId,
+ static RouteSessionOperationPtr create(const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
@@ -82,17 +83,21 @@ public:
std::string getDestination() {return mDestination;}
+ AsteriskSCF::System::V1::OperationContextPtr getCreateBridgeContext() {return mCreateBridgeContext;}
+
AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
/**
* Factory method for replica objects.
*/
- static RouteSessionOperationPtr createReplica(const SessionContextPtr& context);
+ static RouteSessionOperationPtr createReplica(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
/**
* Update a replica object with new state information.
*/
- void reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::OperationStateItemPtr& stateItem);
+ void reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemPtr& stateItem);
/**
* Set the state machine into the highest state possible based on all of the state updates
@@ -102,8 +107,8 @@ public:
protected:
// Normal constructor
- RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
- const std::string& operationId,
+ RouteSessionOperation(const AsteriskSCF::Operations::AMDContextData<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>::ptr_type& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
@@ -114,7 +119,9 @@ protected:
OperationsManager* const listener);
// Constructor for replicas.
- RouteSessionOperation(const SessionContextPtr& context);
+ RouteSessionOperation(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const SessionContextPtr& context);
private:
void initStateMachine();
@@ -183,6 +190,7 @@ private:
AsteriskSCF::SessionCommunications::PartyIdentification::V1::CallerPtr mCallerID;
AsteriskSCF::SessionCommunications::PartyIdentification::V1::RedirectionsPtr mRedirects;
std::vector<RouteSessionOp::OperationState> mReplicatedStates;
+ AsteriskSCF::System::V1::OperationContextPtr mCreateBridgeContext;
}; // class RouteSessionOperation
diff --git a/src/RoutingAdmin.cpp b/src/RoutingAdmin.cpp
index 1974426..eba84e4 100644
--- a/src/RoutingAdmin.cpp
+++ b/src/RoutingAdmin.cpp
@@ -20,6 +20,7 @@
using namespace AsteriskSCF::Core::Routing::V1;
using namespace std;
+using namespace AsteriskSCF::System::V1;
namespace AsteriskSCF
{
@@ -34,11 +35,11 @@ RoutingAdmin::RoutingAdmin(const EndpointRegistryPtr& endpointRegistry) :
/**
* Drop references to all EndpointLocators that have been registered.
*/
-void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
+void RoutingAdmin::clearEndpointLocators(const OperationContextPtr& operationContext, const Ice::Current&)
{
// For now we just forward to the registry. Some type of authentication may be required
// in the future, or perhaps the access to the interface is controlled externally.
- mEndpointRegistry->clearEndpointLocators();
+ mEndpointRegistry->clearEndpointLocators(operationContext);
}
/**
@@ -46,11 +47,11 @@ void RoutingAdmin::clearEndpointLocators(const Ice::Current&)
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
-void RoutingAdmin::setPolicy(const std::string& policy, const Ice::Current&)
+void RoutingAdmin::setPolicy(const OperationContextPtr& operationContext, const std::string& policy, const Ice::Current&)
{
// For now we just forward to the registry. Some type of authentication may be required
// in the future, or perhaps the access to the interface is controlled externally.
- mEndpointRegistry->setPolicy(policy);
+ mEndpointRegistry->setPolicy(operationContext, policy);
}
} // end BasicRoutingService
diff --git a/src/RoutingAdmin.h b/src/RoutingAdmin.h
index 9cf679a..a023ec3 100644
--- a/src/RoutingAdmin.h
+++ b/src/RoutingAdmin.h
@@ -37,14 +37,14 @@ public: // RoutingServiceAdmin overrides
/**
* Drop references to all EndpointLocators that have been registered.
*/
- void clearEndpointLocators(const ::Ice::Current&);
+ void clearEndpointLocators(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::Ice::Current&);
/**
* Sends a policy string to the script processor. The default implementation is a no-op,
* but site-specific scripts may make use it.
* @param policy A site-specific policy specification.
*/
- void setPolicy(const ::std::string& policy, const ::Ice::Current&);
+ void setPolicy(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const ::std::string& policy, const ::Ice::Current&);
private:
EndpointRegistryPtr mEndpointRegistry;
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index eba8bef..ad64464 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -19,6 +19,9 @@
#include <IceStorm/IceStorm.h>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
+#include <AsteriskSCF/Operations/OperationMonitor.h>
+
#include <boost/thread/mutex.hpp>
#include "RoutingServiceEventPublisher.h"
@@ -26,6 +29,8 @@ using namespace ::std;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
using namespace ::AsteriskSCF::BasicRoutingService;
+using namespace ::AsteriskSCF::System::V1;
+using namespace ::AsteriskSCF::Operations;
namespace
{
@@ -151,9 +156,11 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
/**
* Send a message to the service's event topic to report a lookup event.
+ * The operation that triggers this event is idempotent.
*/
-void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
- AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
+void RoutingServiceEventPublisher::lookupEvent(
+ const std::string& destination,
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result)
{
if (!mImpl->isInitialized())
{
@@ -162,7 +169,8 @@ void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
try
{
- mImpl->mEventTopic->lookupEvent(destination, result);
+ // Since the lookup itself is idempotent, we'll create a new context for the event publication.
+ mImpl->mEventTopic->lookupEvent(AsteriskSCF::Operations::createContext(), destination, result);
}
catch(const Ice::Exception &e)
{
@@ -174,11 +182,11 @@ void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
/**
* Send a message to the service's event topic to report the addEndpointLocator event.
*/
-void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& locatorId,
+void RoutingServiceEventPublisher::addEndpointLocatorEvent(const OperationContextPtr& sourceContext,
+ const std::string& locatorId,
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList,
const AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx& locator,
- AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
- const Ice::Current &)
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result)
{
if (!mImpl->isInitialized())
{
@@ -187,7 +195,7 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
try
{
- mImpl->mEventTopic->addEndpointLocatorEvent(locatorId, regexList, locator, result);
+ mImpl->mEventTopic->addEndpointLocatorEvent(calculateOperationContext(sourceContext, "addLocator"), locatorId, regexList, locator, result);
}
catch(const Ice::Exception &e)
{
@@ -198,8 +206,9 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
/**
* Send a message to the service's event topic to report the removeEndpointLocator event.
*/
-void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
- AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
+void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const OperationContextPtr& sourceContext,
+ const std::string& locatorId,
+ AsteriskSCF::Core::Routing::V1::Event::OperationResult result)
{
if (!mImpl->isInitialized())
{
@@ -208,7 +217,7 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
try
{
- mImpl->mEventTopic->removeEndpointLocatorEvent(locatorId, result);
+ mImpl->mEventTopic->removeEndpointLocatorEvent(calculateOperationContext(sourceContext, "removeLocator"), locatorId, result);
}
catch(const Ice::Exception &e)
{
@@ -219,9 +228,10 @@ void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string&
/**
* Send a message to the service's event topic to report the setEndpointLocatorDestinationIds event.
*/
-void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const std::string& locatorId,
- const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
- const Ice::Current &)
+void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(
+ const OperationContextPtr& sourceContext,
+ const std::string& locatorId,
+ const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result)
{
if (!mImpl->isInitialized())
{
@@ -230,7 +240,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
try
{
- mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, result);
+ mImpl->mEventTopic->setEndpointLocatorDestinationIdsEvent(calculateOperationContext(sourceContext, "setIds"), locatorId, regexList, result);
}
catch(const Ice::Exception &e)
{
@@ -241,7 +251,7 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
/**
* Send a message to the service's event topic to report the clearEndpointLocators event.
*/
-void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
+void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const OperationContextPtr& sourceContext)
{
if (!mImpl->isInitialized())
{
@@ -250,7 +260,7 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
try
{
- mImpl->mEventTopic->clearEndpointLocatorsEvent();
+ mImpl->mEventTopic->clearEndpointLocatorsEvent(calculateOperationContext(sourceContext, "clearLocators"));
}
catch(const Ice::Exception &e)
{
@@ -261,7 +271,8 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
/**
* Send a message to the service's event topic to report the setPolicy event.
*/
-void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
+void RoutingServiceEventPublisher::setPolicyEvent(const OperationContextPtr& sourceContext,
+ const std::string& policy)
{
if (!mImpl->isInitialized())
{
@@ -270,7 +281,7 @@ void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, con
try
{
- mImpl->mEventTopic->setPolicyEvent(policy);
+ mImpl->mEventTopic->setPolicyEvent(calculateOperationContext(sourceContext, "setPolicy"), policy);
}
catch(const Ice::Exception &e)
{
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index 059b6ad..7a61116 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -16,6 +16,7 @@
#pragma once
#include <Ice/Ice.h>
+#include <IceUtil/Handle.h>
#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
@@ -32,7 +33,7 @@ class RoutingServiceEventPublisherPriv;
/**
* Publishes key events to the rest of the system.
*/
-class RoutingServiceEventPublisher : public ::AsteriskSCF::Core::Routing::V1::Event::RoutingEvents
+class RoutingServiceEventPublisher : public IceUtil::Shared
{
public:
RoutingServiceEventPublisher(const Ice::ObjectAdapterPtr& adapter);
@@ -41,57 +42,67 @@ public:
/**
* Send a message to the service's event topic to report a lookup event.
+ * @param operationContext The operation context to send with this event.
* @param destination The destination to be looked up.
* @param result Informs event listeners of the operations success or failure.
*/
- void lookupEvent(const std::string& destination, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
... 2559 lines suppressed ...
--
asterisk-scf/release/routing.git
More information about the asterisk-scf-commits
mailing list