[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "retry_deux" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Mar 8 22:42:32 CST 2012
branch "retry_deux" has been updated
via f11c81e06cee6a82430a96062f2fe4198a7dde70 (commit)
via d8135e434d8dd5ff81a46d42d4b1838313ce8d6d (commit)
from 11b242156f34537cfcf565e7a632fdaf52d7b49d (commit)
Summary of changes:
.../BasicRoutingStateReplicationIf.ice | 13 ++-
src/CMakeLists.txt | 2 +-
src/Component.cpp | 8 +-
src/ConnectBridgedSessionsOperation.cpp | 11 +-
src/ConnectBridgedSessionsOperation.h | 19 ++--
...nectBridgedSessionsWithDestinationOperation.cpp | 51 ++++++----
...onnectBridgedSessionsWithDestinationOperation.h | 41 ++++---
src/EndpointRegistry.cpp | 11 ++-
src/OperationReplicaCache.h | 8 +-
src/RouteSessionOperation.cpp | 44 +++++---
src/RouteSessionOperation.h | 7 +-
src/RoutingServiceEventPublisher.cpp | 5 +-
src/RoutingServiceEventPublisher.h | 2 +-
src/RoutingStateReplicatorListener.cpp | 21 ++--
src/RoutingStateReplicatorListener.h | 3 +-
src/SessionListener.cpp | 14 ++-
src/SessionListener.h | 2 +-
src/SessionRouter.cpp | 52 ++++++++--
src/SessionRouterOperation.cpp | 14 ++-
src/SessionRouterOperation.h | 111 +++++++++++++++++---
20 files changed, 303 insertions(+), 136 deletions(-)
mode change 100755 => 100644 slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
mode change 100755 => 100644 src/RoutingServiceEventPublisher.cpp
mode change 100755 => 100644 src/RoutingServiceEventPublisher.h
- Log -----------------------------------------------------------------
commit f11c81e06cee6a82430a96062f2fe4198a7dde70
Merge: d8135e4 11b2421
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Mar 8 22:41:56 2012 -0600
Merge
diff --cc slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
index 28059ae,790b20a..d917c2c
mode 100644,100755..100644
--- a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
@@@ -194,7 -187,7 +194,7 @@@ module V
AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
AsteriskSCF::Core::Routing::V1::EndpointLocator *locator;
};
--
++
}; //module V1
}; //module BasicRoutingService
}; //module Replication
diff --cc src/CMakeLists.txt
index 64fbf5f,64fbf5f..fe23784
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@@ -50,5 -50,5 +50,5 @@@ astscf_component_add_slices(BasicRoutin
astscf_component_add_ice_libraries(BasicRoutingStateReplicator IceStorm)
astscf_component_add_slice_collection_libraries(BasicRoutingStateReplicator ASTSCF)
astscf_component_build_icebox(BasicRoutingStateReplicator)
--target_link_libraries(BasicRoutingStateReplicator LoggingClient)
++target_link_libraries(BasicRoutingStateReplicator LoggingClient ASTSCFIceUtilCpp)
astscf_component_install(BasicRoutingStateReplicator)
diff --cc src/RoutingStateReplicatorListener.cpp
index c6a3ffb,1b12b11..343bde4
mode 100644,100755..100755
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
diff --cc src/SessionRouterOperation.h
index 34dabde,06bc96e..187bde1
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@@ -394,9 -315,6 +394,11 @@@ protected
AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
++ AsteriskSCF::System::V1::OperationContextSeq mAddSessionListenerContexts;
++ AsteriskSCF::System::V1::OperationContextSeq mRemoveSessionListenerContexts;
+ AsteriskSCF::System::V1::OperationContextSeq mCreateSessionContexts;
+ AsteriskSCF::System::V1::OperationContextSeq mStartSessionContexts;
+ AsteriskSCF::System::V1::OperationContextPtr mUnregisterListenerContext;
}; // class SessionRouterOperation
commit d8135e434d8dd5ff81a46d42d4b1838313ce8d6d
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Mar 8 21:52:56 2012 -0600
Pre-generating and replicating outgoing OperationContexts.
- Incomplete for EndpointRegistry due to the outgoing Event contexts.
- Incomplete for ConnectBridgedSessionsOperation (which was not previously replicated, but needs to be just
to accomodate outgoing contexts).
diff --git a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
index ef00a45..28059ae 100644
--- a/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/BasicRoutingService/BasicRoutingStateReplicationIf.ice
@@ -74,12 +74,9 @@ module V1
{
idempotent void addListener(RoutingStateReplicatorListener *listener);
idempotent void removeListener(RoutingStateReplicatorListener *listener);
- idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items) throws
- AsteriskSCF::System::V1::OperationCallCancelledException;
- idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items) throws
- AsteriskSCF::System::V1::OperationCallCancelledException;
- idempotent void removeStateForItems(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items) throws
- AsteriskSCF::System::V1::OperationCallCancelledException;
+ idempotent void setState (AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
+ idempotent void removeState(AsteriskSCF::System::V1::OperationContext operationContext, Ice::StringSeq items);
+ idempotent void removeStateForItems(AsteriskSCF::System::V1::OperationContext operationContext, RoutingStateItemSeq items);
idempotent RoutingStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RoutingStateItemSeq getAllState();
};
@@ -89,8 +86,7 @@ module V1
*/
class OperationStateItem extends RoutingStateItem
{
- string operationId;
- string transactionId;
+ AsteriskSCF::System::V1::OperationContext initiatingContext;
};
///////////////////////////////////////////////////////////////////////
@@ -108,6 +104,8 @@ module V1
string destination;
AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHook* hook;
AsteriskSCF::SessionCommunications::PartyIdentification::V1::Caller callerID;
+ AsteriskSCF::System::V1::OperationContext unregisterListenerContext;
+ AsteriskSCF::System::V1::OperationContext createBridgeContext;
};
const string RouteSessionOpStartKeyMod = ".START";
@@ -119,6 +117,8 @@ module V1
class RouteSessionOpWaitLookupState extends OperationStateItem
{
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+ AsteriskSCF::System::V1::OperationContextSeq sessionCreateContexts;
+ AsteriskSCF::System::V1::OperationContextSeq startContexts;
};
const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
@@ -147,6 +147,8 @@ module V1
AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
string destination;
bool replaceSession;
+ AsteriskSCF::System::V1::OperationContext unregisterListenerContext;
+ AsteriskSCF::System::V1::OperationContext addOrReplaceSessionContext;
};
const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
@@ -160,6 +162,8 @@ module V1
{
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
AsteriskSCF::SessionCommunications::V1::SessionSeq remainingSessions;
+ AsteriskSCF::System::V1::OperationContextSeq sessionCreateContexts;
+ AsteriskSCF::System::V1::OperationContextSeq startContexts;
};
const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
diff --git a/src/Component.cpp b/src/Component.cpp
index fbd60a2..e255f79 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -109,7 +109,7 @@ private:
// Support objects.
RoutingServiceEventPublisherPtr mEventPublisher;
- boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
+ boost::shared_ptr<OperationReplicaCache> mSessionRouterOperationCache;
// Replication support
AsteriskSCF::Discovery::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
@@ -312,10 +312,10 @@ void Component::createPrimaryServices()
routingReplicationContext));
// Create the replica cache.
- mOperationReplicaCache = OperationReplicaCachePtr(new OperationReplicaCache(mSessionContext));
+ mSessionRouterOperationCache = OperationReplicaCachePtr(new OperationReplicaCache(mSessionContext));
// Create the SessionRouter interface.
- mSessionRouter = new SessionRouter(mSessionContext, mOperationReplicaCache);
+ mSessionRouter = new SessionRouter(mSessionContext, mSessionRouterOperationCache);
mSessionRouterPrx = SessionRouterPrx::uncheckedCast(
getServiceAdapter()->add(mSessionRouter, getCommunicator()->stringToIdentity(SessionRouterObjectId)));
@@ -338,7 +338,7 @@ void Component::createReplicationStateListeners()
try
{
// Create and publish our state replicator listener interface on the backplane adapter.
- mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mOperationReplicaCache);
+ mReplicatorListener = new RoutingStateReplicatorListenerImpl(mEndpointRegistry, mSessionRouterOperationCache);
prx =
RoutingStateReplicatorListenerPrx::uncheckedCast(getBackplaneAdapter()->addWithUUID(mReplicatorListener));
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index d830e87..b2695fd 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -17,6 +17,7 @@
#include <boost/bind.hpp>
#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Operations/OperationContext.h>
#include "ConnectBridgedSessionsOperation.h"
@@ -48,7 +49,7 @@ namespace BasicRoutingService
*/
ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
- (const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ (const SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
@@ -58,7 +59,7 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr,
ConnectBridgedSessionsOp::OperationState>
- (cb,
+ (cookie,
operationContext,
context,
current,
@@ -77,7 +78,7 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation
* Factory method for the operation.
*/
ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create
- (const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ (const SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
@@ -87,7 +88,7 @@ ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create
OperationsManager* const listener)
{
- ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cb,
+ ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cookie,
operationContext,
sessionToReplace,
bridgedSession,
@@ -152,7 +153,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
return;
}
- SessionSeq migratingSessions = removeSessionsFromBridge(mOperationContext, oldBridge, mBridgedSession);
+ SessionSeq migratingSessions = removeSessionsFromBridge(AsteriskSCF::Operations::createContext(mOperationContext), oldBridge, mBridgedSession);
SessionWithSessionInfoSeq infoSeq;
for (SessionSeq::iterator sessionIter = migratingSessions.begin(); sessionIter != migratingSessions.end(); ++sessionIter)
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index c8a244f..b0779e7 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -60,7 +60,7 @@ class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskS
{
public:
static ConnectBridgedSessionsOperationPtr create(
- const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr& cookie,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
@@ -72,14 +72,15 @@ public:
virtual ~ConnectBridgedSessionsOperation();
protected:
- ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
- bool replaceSession,
- const Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ ConnectBridgedSessionsOperation(
+ const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ bool replaceSession,
+ const Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
private:
/**
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index df77d51..67262f0 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -51,15 +51,14 @@ class ConnectBridgedSessionsWithDestReplicatingListener :
public:
ConnectBridgedSessionsWithDestReplicatingListener(ConnectBridgedSessionsWithDestinationOperationPtr op,
const RoutingReplicationContextPtr& replication)
- : mOperationContext(op->getOperationContext()),
- mOperation(op),
+ : mOperation(op),
mReplicationContext(replication)
{
}
~ConnectBridgedSessionsWithDestReplicatingListener()
{
- lg(Debug) << "ConnectBridgedSessionsWithDestReplicatingListener() being destroyed for operation " << mOperationContext->id;
+ lg(Debug) << "ConnectBridgedSessionsWithDestReplicatingListener() being destroyed for operation " << mOperation->getOperationContext()->id;
}
/**
@@ -79,12 +78,14 @@ public:
// Push this information to the state replicator.
ConnectBridgedSessionsWithDestinationOpStartPtr
opStart(new ConnectBridgedSessionsWithDestinationOpStart());
- opStart->operationId = mOperationContext->id;
- opStart->key = mOperationContext->id +
+ opStart->initiatingContext = mOperation->getOperationContext();
+ opStart->key = mOperation->getOperationContext()->id +
ConnectBridgedSessionsWithDestStartKeyMod;
opStart->sessionToReplace = mOperation->getSessionToReplace();
opStart->destination = mOperation->getDestination();
opStart->replaceSession = mOperation->getReplaceSession();
+ opStart->unregisterListenerContext = mOperation->getUnregisterListenerContext();
+ opStart->addOrReplaceSessionContext = mOperation->getAddOrReplaceSessionContext();
pushState(opStart);
}
@@ -95,11 +96,13 @@ public:
// We've obtained a result from our AMI lookup request.
ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr
waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
- waitLookup->operationId = mOperationContext->id;
- waitLookup->key = mOperationContext->id +
+ waitLookup->initiatingContext = mOperation->getOperationContext();
+ waitLookup->key = mOperation->getOperationContext()->id +
RouteSessionOpWaitLookupKeyMod;
waitLookup->endpoints = mOperation->getLookupResult();
waitLookup->remainingSessions = mOperation->getRemainingSessions();
+ waitLookup->sessionCreateContexts = mOperation->getCreateSessionContexts();
+ waitLookup->startContexts = mOperation->getStartSessionContexts();
pushState(waitLookup);
}
@@ -171,8 +174,8 @@ public:
// Push this information to the state replicator.
ConnectBridgedSessionsWithDestinationOpBridgingStatePtr
bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
- bridgeOp->operationId = mOperationContext->id;
- bridgeOp->key = mOperationContext->id +
+ bridgeOp->initiatingContext = mOperation->getOperationContext();
+ bridgeOp->key = mOperation->getOperationContext()->id +
RouteSessionOpBridgingKeyMod;
bridgeOp->bridge = mOperation->getBridge();
@@ -188,7 +191,6 @@ public:
private:
RoutingStateItemSeq mReplicatedState;
- OperationContextPtr mOperationContext;
ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
RoutingReplicationContextPtr mReplicationContext;
@@ -213,7 +215,7 @@ void ConnectBridgedSessionsWithDestinationOperation::initStateMachine()
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(
- const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const SessionPrx& sessionToReplace,
const ::std::string& destination,
@@ -223,7 +225,7 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
const SessionContextPtr& context,
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr,
- ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
+ ConnectBridgedSessionsWithDestinationOp::OperationState>(cookie,
operationContext,
context,
current,
@@ -233,7 +235,8 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
mSessionToReplace(sessionToReplace),
mDestination(destination),
mReplaceSession(replaceSession),
- mHook(oneShotHook)
+ mHook(oneShotHook),
+ mAddOrReplaceSessionContext(Operations::createContext(operationContext))
{
initStateMachine();
}
@@ -242,7 +245,7 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
* This is the factory method for this operation.
*/
ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::create(
- const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const SessionPrx& sessionToReplace,
const ::std::string& destination,
@@ -253,7 +256,8 @@ ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDest
OperationsManager* const listener)
{
- ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(cb,
+ ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(
+ cookie,
operationContext,
sessionToReplace,
destination,
@@ -326,8 +330,10 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
{
mSessionToReplace = item->sessionToReplace;
mDestination = item->destination;
- mOperationContext = new OperationContext(item->operationId, item->transactionId);
+ mOperationContext = item->initiatingContext;
mReplaceSession = item->replaceSession;
+ mUnregisterListenerContext = item->unregisterListenerContext;
+ mAddOrReplaceSessionContext = item->addOrReplaceSessionContext;
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
}
@@ -336,6 +342,9 @@ void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(
const ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& item)
{
mLookupResult = item->endpoints;
+ mCreateSessionContexts = item->sessionCreateContexts;
+ mStartSessionContexts = item->startContexts;
+
mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
}
@@ -467,7 +476,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
}
// Add a session
- SessionSeq newSessions = createSessionForEndpoints(mOperationContext, mLookupResult, mDestination, mHook, mListenerManager);
+ SessionSeq newSessions = createSessionForEndpoints(mCreateSessionContexts, mLookupResult, mDestination, mHook, mListenerManager);
SessionWithSessionInfoSeq infoSeq;
for (SessionSeq::iterator sessionIter = newSessions.begin(); sessionIter != newSessions.end(); ++sessionIter)
{
@@ -492,7 +501,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
// We're through listening, and we will probably interfere with the Bridge's functionality if
// we keep listening.
- mListenerManager->getListener()->unregister(mOperationContext);
+ mListenerManager->getListener()->unregister(mUnregisterListenerContext);
// Modify the bridge
try
@@ -500,12 +509,12 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
if (mReplaceSession)
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Replacing session with newly routed destination " << mDestination;
- mBridge->replaceSession(mOperationContext, mSessionToReplace, infoSeq);
+ mBridge->replaceSession(mAddOrReplaceSessionContext, mSessionToReplace, infoSeq);
}
else
{
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Adding newly routed destination to session's bridge " << mDestination;
- mBridge->addSessions(mOperationContext, infoSeq);
+ mBridge->addSessions(mAddOrReplaceSessionContext, infoSeq);
}
}
catch (const Ice::Exception &e)
@@ -519,7 +528,7 @@ void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
try
{
- forwardStart(mOperationContext, newSessions);
+ forwardStart(mStartSessionContexts, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 9231401..a7a23f8 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -67,15 +67,16 @@ public:
/**
* Factory method for the class. This method creates an active operation.
*/
- static ConnectBridgedSessionsWithDestinationOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const std::string& destination,
- bool replaceSession,
- const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
- const ::Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ static ConnectBridgedSessionsWithDestinationOperationPtr create(
+ const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ bool replaceSession,
+ const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
virtual ~ConnectBridgedSessionsWithDestinationOperation();
@@ -112,16 +113,19 @@ public:
*/
AsteriskSCF::SessionCommunications::V1::SessionSeq getRemainingSessions() {return mRemainingSessions;}
+ AsteriskSCF::System::V1::OperationContextPtr getAddOrReplaceSessionContext() {return mAddOrReplaceSessionContext;}
+
protected:
- ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const std::string& destination,
- bool replaceSession,
- const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
- const ::Ice::Current& current,
- const SessionContextPtr& context,
- OperationsManager* const listener);
+ ConnectBridgedSessionsWithDestinationOperation(
+ const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr& cookie,
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ bool replaceSession,
+ const ::AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
// Constructor to service replicas.
ConnectBridgedSessionsWithDestinationOperation(
@@ -184,6 +188,7 @@ private:
AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState> mReplicatedStates;
+ AsteriskSCF::System::V1::OperationContextPtr mAddOrReplaceSessionContext;
}; // class ConnectBridgedSessionsWithDestinationOperation
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index be6b387..0ef8e37 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -90,7 +90,7 @@ public:
mScriptProcessor(scriptProcessor),
mEventPublisher(eventPublisher),
mReplicationContext(replicationContext),
- mOperationContextCache(new OperationContextCache(180))
+ mOperationContextCache(OperationContextCache::create(180))
{
}
@@ -149,6 +149,7 @@ public:
EndpointLocatorStatePtr addEndpointItem(new EndpointLocatorState());
addEndpointItem->key = locatorId;
+ addEndpointItem->initiatingContext = operationContext;
removeItems.push_back(addEndpointItem);
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Sending replicator state removal for locator " << locatorId;
@@ -190,6 +191,7 @@ public:
addEndpointItem->key = locatorId;
addEndpointItem->locator = locator;
addEndpointItem->regExList = regexList;
+ addEndpointItem->initiatingContext = operationContext;
setItems.push_back(addEndpointItem);
@@ -239,6 +241,7 @@ public:
addEndpointItem->key = locatorId;
addEndpointItem->locator = locator;
addEndpointItem->regExList = regexList;
+ addEndpointItem->initiatingContext = operationContext;
setItems.push_back(addEndpointItem);
@@ -327,7 +330,7 @@ public:
lg(Debug) << "EndpointRegistry::lookup() found Endpoint for destination " << mDestination;
// Post event
- mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
+ mEventPublisher->lookupEvent(createContext(), mDestination, Event::SUCCESS);
}
assert(mNumVotes > 0);
@@ -357,7 +360,7 @@ public:
mCallback = 0;
// Post event
- mEventPublisher->lookupEvent(mDestination, Event::FAILURE);
+ mEventPublisher->lookupEvent(createContext(), mDestination, Event::FAILURE);
lg(Debug) << "EndpointRegistry::lookup() failed to find destination " << mDestination;
}
@@ -398,7 +401,7 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
{
if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
{
- mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
+ mImpl->mEventPublisher->lookupEvent(createContext(), destination, Event::FAILURE);
lg(Error) << "lookup(): denied by confirmLookup() script.";
amdcallback->ice_response(endpoints);
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
index a3ba15f..5a272c5 100644
--- a/src/OperationReplicaCache.h
+++ b/src/OperationReplicaCache.h
@@ -76,14 +76,14 @@ public:
boost::unique_lock<boost::shared_mutex> lock(mLock);
// See if this operation is in the cache.
- typename OpMapType::iterator i = mReplicas.find(item->operationId);
+ typename OpMapType::iterator i = mReplicas.find(item->initiatingContext->id);
if (i == mReplicas.end())
{
// Add an entry to the cache.
OperationReplicaItem< boost::shared_ptr<O> > replica;
- mReplicas[item->operationId] = replica;
+ mReplicas[item->initiatingContext->id] = replica;
- i = mReplicas.find(item->operationId);
+ i = mReplicas.find(item->initiatingContext->id);
}
// Add this item to the replica's collection of state items.
@@ -92,7 +92,7 @@ public:
// If we haven't created the replica yet, do so now.
if ((*i).second.mOperation.get() == 0)
{
- boost::shared_ptr<O> workPtr(O::createReplica(new OperationContext(item->operationId,item->transactionId), mSessionContext));
+ boost::shared_ptr<O> workPtr(O::createReplica(item->initiatingContext, mSessionContext));
(*i).second.mOperation = workPtr;
}
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 67526b1..56d029e 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -53,15 +53,14 @@ class RouteSessionReplicatingListener :
public:
RouteSessionReplicatingListener(RouteSessionOperationPtr op,
const RoutingReplicationContextPtr& replicationContext)
- : mOperationContext(op->getOperationContext()),
- mOperation(op),
+ : mOperation(op),
mReplicationContext(replicationContext)
{
}
~RouteSessionReplicatingListener()
{
- lg(Debug) << "RouteSessionReplicatingListener() being destroyed for operation context " << mOperationContext;
+ lg(Debug) << "RouteSessionReplicatingListener() being destroyed for operation context " << mOperation->getOperationContext()->id;
}
/**
@@ -78,13 +77,16 @@ public:
{
case RouteSessionOp::STATE_LOOKUP:
{
- // This is the initial state. All the state of interest is what's been passed in.
+ // This is the initial state. All the state of interest is what's been passed in
+ // and our pre-computed operation contexts for outgoing calls.
// Push this information to the state replicator.
RouteSessionOpStartPtr routeSessionOpStart(new RouteSessionOpStart());
- routeSessionOpStart->operationId = mOperation->getOperationContext()->id;
+ routeSessionOpStart->initiatingContext = mOperation->getOperationContext();
routeSessionOpStart->key = mOperation->getOperationContext()->id + RouteSessionOpStartKeyMod;
routeSessionOpStart->source = mOperation->getSource();
routeSessionOpStart->destination = mOperation->getDestination();
+ routeSessionOpStart->unregisterListenerContext = mOperation->getUnregisterListenerContext();
+ routeSessionOpStart->createBridgeContext = mOperation->getCreateBridgeContext();
pushState(routeSessionOpStart);
}
@@ -94,9 +96,11 @@ public:
{
// We've obtained a result from our AMI lookup request.
RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
- routeSessionOpWaitLookup->operationId = mOperation->getOperationContext()->id;
+ routeSessionOpWaitLookup->initiatingContext = mOperation->getOperationContext();
routeSessionOpWaitLookup->key = mOperation->getOperationContext()->id + RouteSessionOpWaitLookupKeyMod;
routeSessionOpWaitLookup->endpoints = mOperation->getLookupResult();
+ routeSessionOpWaitLookup->sessionCreateContexts = mOperation->getCreateSessionContexts();
+ routeSessionOpWaitLookup->startContexts = mOperation->getStartSessionContexts();
pushState(routeSessionOpWaitLookup);
}
@@ -166,7 +170,7 @@ public:
// We just completed the bridge creation.
// Push this information to the state replicator.
RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
- routeSessionOpBridging->operationId = mOperation->getOperationContext()->id;
+ routeSessionOpBridging->initiatingContext = mOperation->getOperationContext();
routeSessionOpBridging->key = mOperation->getOperationContext()->id + RouteSessionOpBridgingKeyMod;
routeSessionOpBridging->bridge = mOperation->getBridge();
@@ -185,7 +189,6 @@ public:
private:
RoutingStateItemSeq mReplicatedState;
- OperationContextPtr mOperationContext;
RouteSessionOperationPtr mOperation;
RoutingReplicationContextPtr mReplicationContext;
};
@@ -212,7 +215,7 @@ void RouteSessionOperation::initStateMachine()
* This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
*/
-RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+RouteSessionOperation::RouteSessionOperation(const SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
@@ -223,7 +226,7 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
const SessionContextPtr& context,
OperationsManager* const listener)
: SessionRouterOperation<AMD_SessionRouter_routeSessionPtr,
- RouteSessionOp::OperationState> (cb,
+ RouteSessionOp::OperationState> (cookie,
operationContext,
context,
current,
@@ -234,7 +237,8 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
mDestination(destination),
mHook(oneShotHook),
mCallerID(callerID),
- mRedirects(redirects)
+ mRedirects(redirects),
+ mCreateBridgeContext(Operations::createContext(operationContext))
{
initStateMachine();
}
@@ -242,7 +246,7 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
/**
* This is the factory method for RouteSessionOperation.
*/
-RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_routeSessionPtr& cb,
+RouteSessionOperationPtr RouteSessionOperation::create(const SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr>::Ptr& cookie,
const OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
@@ -254,7 +258,7 @@ RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_r
OperationsManager* const listener)
{
- RouteSessionOperationPtr op (new RouteSessionOperation(cb,
+ RouteSessionOperationPtr op (new RouteSessionOperation(cookie,
operationContext,
source,
destination,
@@ -323,7 +327,9 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
mDestination = item->destination;
mHook = item->hook;
mCallerID = item->callerID;
- mOperationContext = new OperationContext(item->operationId, item->transactionId);
+ mOperationContext = item->initiatingContext;
+ mUnregisterListenerContext = item->unregisterListenerContext;
+ mCreateBridgeContext = item->createBridgeContext;
mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
}
@@ -331,6 +337,8 @@ void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicR
void RouteSessionOperation::reflectUpdate(const AsteriskSCF::Replication::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr& item)
{
mLookupResult = item->endpoints;
+ mCreateSessionContexts = item->sessionCreateContexts;
+ mStartSessionContexts = item->startContexts;
mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
}
@@ -483,7 +491,7 @@ void RouteSessionOperation::establishBridgeState()
}
// Add a session to the endpoints.
- SessionSeq newSessions = createSessionForEndpoints(mOperationContext, mLookupResult, mDestination, mHook, mListenerManager);
+ SessionSeq newSessions = createSessionForEndpoints(mCreateSessionContexts, mLookupResult, mDestination, mHook, mListenerManager);
if (mListenerManager->getListener()->getNumSessions() < 2)
{
@@ -499,7 +507,7 @@ void RouteSessionOperation::establishBridgeState()
// We're through listening, and we will probably interfere with the
// Bridge's functionality if we keep listening.
- mListenerManager->getListener()->unregister(mOperationContext);
+ mListenerManager->getListener()->unregister(mUnregisterListenerContext);
// Create the bridge
BridgePrx bridge;
@@ -512,7 +520,7 @@ void RouteSessionOperation::establishBridgeState()
bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
lg(Debug) << BOOST_CURRENT_FUNCTION << ": Creating bridge.";
- bridge = mSessionContext->bridgeManager->createBridge(mOperationContext, mSource, bridgedSessions, 0, mCallerID, mRedirects);
+ bridge = mSessionContext->bridgeManager->createBridge(mCreateBridgeContext, mSource, bridgedSessions, 0, mCallerID, mRedirects);
}
catch (const Ice::Exception &e)
{
@@ -527,7 +535,7 @@ void RouteSessionOperation::establishBridgeState()
try
{
- forwardStart(mOperationContext, newSessions);
+ forwardStart(mStartSessionContexts, newSessions);
}
catch (const Ice::Exception &e)
{
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index a900541..01e4620 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -65,7 +65,7 @@ public:
/**
* Factory method for the class. This method creates an active routing operation.
*/
- static RouteSessionOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ static RouteSessionOperationPtr create(const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>::Ptr& cookie,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
@@ -82,6 +82,8 @@ public:
std::string getDestination() {return mDestination;}
+ AsteriskSCF::System::V1::OperationContextPtr getCreateBridgeContext() {return mCreateBridgeContext;}
+
AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge() {return mBridge;}
/**
@@ -104,7 +106,7 @@ public:
protected:
// Normal constructor
- RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ RouteSessionOperation(const SessionRouterOperationCookie<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>::Ptr& cookie,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
@@ -187,6 +189,7 @@ private:
AsteriskSCF::SessionCommunications::PartyIdentification::V1::CallerPtr mCallerID;
AsteriskSCF::SessionCommunications::PartyIdentification::V1::RedirectionsPtr mRedirects;
std::vector<RouteSessionOp::OperationState> mReplicatedStates;
+ AsteriskSCF::System::V1::OperationContextPtr mCreateBridgeContext;
}; // class RouteSessionOperation
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index a14f5f1..eb58a38 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -156,6 +156,7 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
* Send a message to the service's event topic to report a lookup event.
*/
void RoutingServiceEventPublisher::lookupEvent(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
@@ -166,7 +167,7 @@ void RoutingServiceEventPublisher::lookupEvent(
try
{
- mImpl->mEventTopic->lookupEvent(destination, result);
+ mImpl->mEventTopic->lookupEvent(AsteriskSCF::Operations::createContext(operationContext), destination, result);
}
catch(const Ice::Exception &e)
{
diff --git a/src/RoutingServiceEventPublisher.h b/src/RoutingServiceEventPublisher.h
index 77158e6..4ee2018 100644
--- a/src/RoutingServiceEventPublisher.h
+++ b/src/RoutingServiceEventPublisher.h
@@ -45,6 +45,7 @@ public:
* @param result Informs event listeners of the operations success or failure.
*/
void lookupEvent(
+ const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current&);
diff --git a/src/RoutingStateReplicatorListener.cpp b/src/RoutingStateReplicatorListener.cpp
index 59e1257..c6a3ffb 100644
--- a/src/RoutingStateReplicatorListener.cpp
+++ b/src/RoutingStateReplicatorListener.cpp
@@ -47,11 +47,12 @@ namespace BasicRoutingService
class RoutingStateReplicatorListenerPriv
{
public:
- RoutingStateReplicatorListenerPriv(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+ RoutingStateReplicatorListenerPriv(const EndpointRegistryPtr& registry,
+ const boost::shared_ptr<OperationReplicaCache>& operationReplicaCache)
: mId(IceUtil::generateUUID()),
mEndpointRegistry(registry),
- mOperationReplicaCache(opCache),
- mOperationContextCache(new OperationContextCache(180))
+ mOperationReplicaCache(operationReplicaCache),
+ mOperationContextCache(OperationContextCache::create(180))
{
}
@@ -117,7 +118,7 @@ public:
void visitRouteSessionOpStart(const RouteSessionOpStartPtr& opState)
{
// The operation cache keeps all the collected state for an operation under the transaction id.
- mImpl->mOperationReplicaCache->getRouteSessionCache()->dropOperation(opState->operationId);
+ mImpl->mOperationReplicaCache->getRouteSessionCache()->dropOperation(opState->initiatingContext->id);
}
void visitRouteSessionOpWaitLookupState(const RouteSessionOpWaitLookupStatePtr& )
@@ -132,7 +133,7 @@ public:
void visitConnectBridgedSessionsWithDestinationOpStart(const ConnectBridgedSessionsWithDestinationOpStartPtr& opState)
{
- mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->dropOperation(opState->operationId);
+ mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->dropOperation(opState->initiatingContext->id);
}
void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr& )
@@ -147,7 +148,7 @@ public:
void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
{
- mImpl->mEndpointRegistry->removeEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, ::Ice::Current());
+ mImpl->mEndpointRegistry->removeEndpointLocator(item->initiatingContext, item->key, ::Ice::Current());
}
}; // end method-local visitor def
@@ -260,7 +261,7 @@ public:
void visitEndpointLocatorState(const EndpointLocatorStatePtr& item)
{
- mImpl->mEndpointRegistry->addEndpointLocator(new OperationContext(item->operationId, item->transactionId), item->key, item->regExList, item->locator, mCurrent);
+ mImpl->mEndpointRegistry->addEndpointLocator(item->initiatingContext, item->key, item->regExList, item->locator, mCurrent);
}
}; // end method-local visitor def
@@ -280,8 +281,10 @@ public:
OperationContextCachePtr mOperationContextCache;
};
-RoutingStateReplicatorListenerImpl::RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
- : mImpl(new RoutingStateReplicatorListenerPriv(registry, opCache))
+RoutingStateReplicatorListenerImpl::RoutingStateReplicatorListenerImpl(
+ const EndpointRegistryPtr& registry,
+ const boost::shared_ptr<OperationReplicaCache>& sessionRouterOperationCache)
+ : mImpl(new RoutingStateReplicatorListenerPriv(registry, sessionRouterOperationCache))
{
}
diff --git a/src/RoutingStateReplicatorListener.h b/src/RoutingStateReplicatorListener.h
index 1b16388..eceb4fa 100644
--- a/src/RoutingStateReplicatorListener.h
+++ b/src/RoutingStateReplicatorListener.h
@@ -40,7 +40,8 @@ class RoutingStateReplicatorListenerPriv;
class RoutingStateReplicatorListenerImpl : public AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateReplicatorListener
{
public:
- RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
+ RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry,
+ const boost::shared_ptr<OperationReplicaCache>& operationReplicaCache);
~RoutingStateReplicatorListenerImpl();
void stateRemoved(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const Ice::StringSeq&, const Ice::Current&);
void stateRemovedForItems(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, const AsteriskSCF::Replication::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 26e3b39..e20b765 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -178,9 +178,10 @@ SessionListenerManager::SessionListenerManager(
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const Ice::ObjectAdapterPtr& adapter,
const SessionPrx& session)
- : mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter),
- mOperationContext(operationContext)
+ : mOperationContext(operationContext),
+ mSessionListener(new SessionListenerImpl()),
+ mAdapter(adapter)
+
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
@@ -210,9 +211,10 @@ SessionListenerManager::SessionListenerManager(
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const Ice::ObjectAdapterPtr& adapter,
const SessionSeq& sessionSequence)
- : mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter),
- mOperationContext(operationContext)
+ : mOperationContext(operationContext),
+ mSessionListener(new SessionListenerImpl()),
+ mAdapter(adapter)
+
{
Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
diff --git a/src/SessionListener.h b/src/SessionListener.h
index 4996b3b..738abaa 100644
--- a/src/SessionListener.h
+++ b/src/SessionListener.h
@@ -101,9 +101,9 @@ public:
SessionListenerImpl* getListener() const;
private:
+ AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
SessionListenerImpl* mSessionListener;
Ice::ObjectAdapterPtr mAdapter;
- AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
}; // class SessionListenerManager
typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 79d8851..1b51d69 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -73,6 +73,7 @@ public:
SessionRouterPriv(const SessionContextPtr& sessionContext,
const boost::shared_ptr<OperationReplicaCache>& operationCache) :
mSessionContext(sessionContext),
+ mOperationContextCache(OperationContextCache::create(180)),
mOperationReplicaCache(operationCache)
{
}
@@ -174,8 +175,20 @@ void SessionRouter::routeSession_async(const AMD_SessionRouter_routeSessionPtr&
const RedirectionsPtr& redirects,
const ::Ice::Current& current)
{
- if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr>::Ptr newCookie(new
+ SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr>(cb));
+
+ OperationContextCookiePtr existingCookie;
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext, newCookie, existingCookie))
{
+ SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr>::Ptr cookie =
+ boost::dynamic_pointer_cast<SessionRouterOperationCookie<AMD_SessionRouter_routeSessionPtr> >(existingCookie);
+
+ if (cookie != 0)
+ {
+ cookie->addCallback(cb);
+ }
+
lg(Debug) << "Duplicate routeSession() operation detected and rejected for operation " << operationContext->id;
return;
}
@@ -184,14 +197,14 @@ void SessionRouter::routeSession_async(const AMD_SessionRouter_routeSessionPtr&
RouteSessionOperationPtr routeSessionOp;
if (mImpl->mOperationReplicaCache->getRouteSessionCache()->fetchOperation(operationContext->id, routeSessionOp))
{
- routeSessionOp->rehostReplica(cb, current, mImpl.get());
+ routeSessionOp->rehostReplica(newCookie, current, mImpl.get());
WorkPtr replicaOp(routeSessionOp);
mImpl->scheduleOperation(replicaOp);
return;
}
- WorkPtr op(RouteSessionOperation::create(cb,
+ WorkPtr op(RouteSessionOperation::create(newCookie,
operationContext,
source,
destination,
@@ -223,8 +236,20 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const AMD_Sessio
const SessionCreationHookPrx& oneShotHook,
const ::Ice::Current& current)
{
- if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr newCookie(new
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb));
+
+ OperationContextCookiePtr existingCookie;
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext, newCookie, existingCookie))
{
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>::Ptr cookie =
+ boost::dynamic_pointer_cast<SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr> >(existingCookie);
+
+ if (cookie != 0)
+ {
+ cookie->addCallback(cb);
+ }
+
lg(Debug) << "Duplicate connectBridgedSessionsWithDestination() operation detected and rejected for operation " << operationContext->id;
return;
}
@@ -233,14 +258,14 @@ void SessionRouter::connectBridgedSessionsWithDestination_async(const AMD_Sessio
ConnectBridgedSessionsWithDestinationOperationPtr connectBridgedSessionsWithDestOp;
if (mImpl->mOperationReplicaCache->getConnectBridgedSessionsWithDestCache()->fetchOperation(operationContext->id, connectBridgedSessionsWithDestOp))
{
- connectBridgedSessionsWithDestOp->rehostReplica(cb, current, mImpl.get());
+ connectBridgedSessionsWithDestOp->rehostReplica(newCookie, current, mImpl.get());
WorkPtr replicaOp(connectBridgedSessionsWithDestOp);
mImpl->scheduleOperation(replicaOp);
return;
}
- WorkPtr op(ConnectBridgedSessionsWithDestinationOperation::create(cb,
+ WorkPtr op(ConnectBridgedSessionsWithDestinationOperation::create(newCookie,
operationContext,
sessionToReplace,
destination,
@@ -263,13 +288,24 @@ void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCom
bool replaceSession,
const ::Ice::Current& current)
{
- if (!mImpl->mOperationContextCache->addOperationContext(operationContext))
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr newCookie(new
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr>(cb));
+
+ OperationContextCookiePtr existingCookie;
+ if (!mImpl->mOperationContextCache->addOperationContext(operationContext, newCookie, existingCookie))
{
+ SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr>::Ptr cookie =
+ boost::dynamic_pointer_cast<SessionRouterOperationCookie<AMD_SessionRouter_connectBridgedSessionsPtr> >(existingCookie);
+
+ if (cookie != 0)
+ {
+ cookie->addCallback(cb);
+ }
lg(Debug) << "Duplicate connectBridgedSessions() operation detected and rejected for operation " << operationContext->id;
return;
}
- WorkPtr op(ConnectBridgedSessionsOperation::create(cb,
+ WorkPtr op(ConnectBridgedSessionsOperation::create(newCookie,
operationContext,
sessionToReplace,
bridgedSession,
diff --git a/src/SessionRouterOperation.cpp b/src/SessionRouterOperation.cpp
index 49ab64a..7a24f48 100644
--- a/src/SessionRouterOperation.cpp
+++ b/src/SessionRouterOperation.cpp
@@ -46,11 +46,12 @@ namespace BasicRoutingService
* Forward the start() operation to all sessions in a given sequence.
* Caller should catch Ice::Exception
*/
-void forwardStart(const OperationContextPtr& operationContext, SessionSeq& sessions)
+void forwardStart(const OperationContextSeq& operationContexts, SessionSeq& sessions)
{
- for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s)
+ int i=0;
+ for (SessionSeq::iterator s = sessions.begin(); s != sessions.end(); ++s, ++i)
{
- (*s)->start(createContext(operationContext)); // Caller should catch Ice::Exception
+ (*s)->start(operationContexts[i]); // Caller should catch Ice::Exception
}
}
@@ -97,7 +98,7 @@ BridgePrx getBridge(SessionPrx session)
* newly added sessions.
*/
SessionSeq createSessionForEndpoints(
- const OperationContextPtr& operationContext,
+ const OperationContextSeq& operationContexts,
const EndpointSeq& endpoints,
const std::string& destination,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
@@ -105,7 +106,8 @@ SessionSeq createSessionForEndpoints(
{
// Add a session
SessionSeq newSessions;
- for (EndpointSeq::const_iterator e = endpoints.begin(); e != endpoints.end(); ++e)
+ int i=0;
+ for (EndpointSeq::const_iterator e = endpoints.begin(); e != endpoints.end(); ++e, ++i)
{
try
{
@@ -113,7 +115,7 @@ SessionSeq createSessionForEndpoints(
// Create a session on the destination.
lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
- SessionPrx destSession = sessionEndpoint->createSession(operationContext, destination, listenerManager->getListener()->getProxy(), oneShotHook);
+ SessionPrx destSession = sessionEndpoint->createSession(operationContexts[i], destination, listenerManager->getListener()->getProxy(), oneShotHook);
if(!destSession)
{
lg(Debug) << " Session endpoint returned a null proxy, continuing with other endpoints";
diff --git a/src/SessionRouterOperation.h b/src/SessionRouterOperation.h
index 06bc96e..34dabde 100644
--- a/src/SessionRouterOperation.h
+++ b/src/SessionRouterOperation.h
@@ -28,6 +28,7 @@
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/System/OperationsIf.h>
+#include <AsteriskSCF/Operations/OperationContextCache.h>
#include "RoutingReplicationContext.h"
@@ -99,6 +100,41 @@ protected:
// Forward-declaration
template <typename T, typename S> class LookupCallback;
+/**
+ * This template provides a cookie to associate with a SessionRouter operation.
+ * The cookie collects AMD callback objects (of type T) so that retries of
+ * operations can be called back.
+ */
+template<typename T>
+class SessionRouterOperationCookie : public AsteriskSCF::Operations::OperationContextCookie
+{
+public:
+ SessionRouterOperationCookie(const T& callback)
+ {
+ mCallbacks.push_back(callback);
+ }
+
+ ~SessionRouterOperationCookie() {}
+
+ void addCallback(const T& callback)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mCallbacks.push_back(callback);
+ }
+
+ const std::vector<T>& getCallbacks()
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ return mCallbacks;
+ }
+
+ typedef boost::shared_ptr<SessionRouterOperationCookie<T> > Ptr;
+
+private:
+ std::vector<T> mCallbacks;
+ boost::shared_mutex mLock;
+};
+
/**
* This is a base class for worker objects that offload SessionRouter operations
* to a worker thead during an AMD invocation. It implements the WorkQueue::Work
@@ -112,25 +148,26 @@ class SessionRouterOperation : public AsteriskSCF::Threading::WorkQueue::Work
public:
/**
* Constructor.
- * @param amdCallback The callback object to provide results to the initiator of this operation.
+ * @param cookie Contains the callback references needed to reply to the caller(s).
* @param operationContext Unique ID for this operation as assigned by the caller.
* @param context The SessionContext provides references to key objects needed by each operation.
* @param manager
* @param defaultState The initial state of the operation's state machine.
*/
- SessionRouterOperation(const T& amdCallback,
+ SessionRouterOperation(const typename SessionRouterOperationCookie<T>::Ptr& cookie,
const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
const SessionContextPtr& context,
const ::Ice::Current& current,
OperationsManager* manager,
S defaultState)
- : mInitiatorCallback(amdCallback),
+ : mCookie(cookie),
mOperationContext(operationContext),
mSessionContext(context),
mIceCurrent(current),
mFinished(false),
mOperationsManager(manager),
- mStateMachine(defaultState)
+ mStateMachine(defaultState),
+ mUnregisterListenerContext(AsteriskSCF::Operations::createContext(operationContext))
{
}
@@ -176,8 +213,14 @@ public:
*/
void finishWithException(const ::std::exception& e)
{
- // Forward to this operation's initiator.
- mInitiatorCallback->ice_exception(e);
+ // Forward to this operation's callers. (Support plural in case of retries).
+
+ std::vector<T> callbacks = mCookie->getCallbacks();
+ for(std::vector<T>::iterator it = callbacks.begin(); it != callbacks.end(); ++it)
+ {
+ (*it)->ice_exception(e);
+ }
+
finish();
}
@@ -187,7 +230,12 @@ public:
*/
void finishWithException()
{
- mInitiatorCallback->ice_exception();
+ std::vector<T> callbacks = mCookie->getCallbacks();
+ for(std::vector<T>::iterator it = callbacks.begin(); it != callbacks.end(); ++it)
+ {
+ (*it)->ice_exception();
+ }
+
finish();
}
@@ -197,7 +245,12 @@ public:
*/
void finishAndSendResult()
{
- mInitiatorCallback->ice_response();
+ std::vector<T> callbacks = mCookie->getCallbacks();
+ for(std::vector<T>::iterator it = callbacks.begin(); it != callbacks.end(); ++it)
+ {
+ (*it)->ice_response();
+ }
+
finish();
}
@@ -255,6 +308,17 @@ public:
}
mLookupResult = endpoints;
+
+ // Pre-allocate the operationContexts to be used when
+ // creating sessions for the endpoints so that they
+ // can be replicated.
+ for (int i=0; i < mLookupResult.size(); ++i)
+ {
+ mCreateSessionContexts.push_back(AsteriskSCF::Operations::createContext(mOperationContext));
+ mStartSessionContexts.push_back(AsteriskSCF::Operations::createContext(mOperationContext));
+ }
+ // Also need to start the source, so one more for this set.
+ mStartSessionContexts.push_back(AsteriskSCF::Operations::createContext(mOperationContext));
// Reschedule this operation to continue.
try
@@ -296,15 +360,30 @@ public:
/**
* Sets a replicated operation's non-replicated state related to the specific host process.
*/
- void rehostReplica(const T& amdCallback, const ::Ice::Current& current, OperationsManager* manager)
+ void rehostReplica(const typename SessionRouterOperationCookie<T>::Ptr& cookie, const ::Ice::Current& current, OperationsManager* manager)
{
- mInitiatorCallback = amdCallback;
+ mCookie = cookie;
mIceCurrent = current;
mOperationsManager = manager;
}
+ AsteriskSCF::System::V1::OperationContextSeq getCreateSessionContexts()
+ {
+ return mCreateSessionContexts;
+ }
+
+ AsteriskSCF::System::V1::OperationContextSeq getStartSessionContexts()
+ {
+ return mStartSessionContexts;
+ }
+
+ AsteriskSCF::System::V1::OperationContextPtr getUnregisterListenerContext()
+ {
+ return mUnregisterListenerContext;
+ }
+
protected:
- T mInitiatorCallback;
+ typename SessionRouterOperationCookie<T>::Ptr mCookie;
AsteriskSCF::System::V1::OperationContextPtr mOperationContext;
SessionContextPtr mSessionContext;
::Ice::Current mIceCurrent;
@@ -315,6 +394,9 @@ protected:
AsteriskSCF::StateMachine::SimpleStateMachine<S> mStateMachine;
AsteriskSCF::Core::Endpoint::V1::EndpointSeq mLookupResult;
+ AsteriskSCF::System::V1::OperationContextSeq mCreateSessionContexts;
+ AsteriskSCF::System::V1::OperationContextSeq mStartSessionContexts;
+ AsteriskSCF::System::V1::OperationContextPtr mUnregisterListenerContext;
}; // class SessionRouterOperation
@@ -365,7 +447,8 @@ private:
/**
* Forward the start() operation to all sessions in a given sequence.
*/
-void forwardStart(const AsteriskSCF::System::V1::OperationContextPtr& operationContext, AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+void forwardStart(const AsteriskSCF::System::V1::OperationContextSeq& operationContexts,
+ AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
/**
* Provide access to the bridge for a given session.
@@ -377,7 +460,7 @@ AsteriskSCF::SessionCommunications::V1::BridgePrx getBridge(AsteriskSCF::Session
* newly added sessions.
*/
AsteriskSCF::SessionCommunications::V1::SessionSeq createSessionForEndpoints(
- const AsteriskSCF::System::V1::OperationContextPtr& operationContext,
+ const AsteriskSCF::System::V1::OperationContextSeq& operationContexts,
const AsteriskSCF::Core::Endpoint::V1::EndpointSeq& endpoints,
const std::string& destination,
const AsteriskSCF::SessionCommunications::ExtensionPoints::V1::SessionCreationHookPrx& oneShotHook,
-----------------------------------------------------------------------
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list