[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_replica" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 12 20:35:59 CDT 2011
branch "route_replica" has been updated
via cc3b28f14b514d647fa4933a0a54c6c13705191b (commit)
from 87dec489be1edc35387c4a774a33b4a59ea68112 (commit)
Summary of changes:
local-slice/BasicRoutingStateReplicationIf.ice | 127 ++++++------
local-slice/CMakeLists.txt | 4 +
src/BasicRoutingServiceApp.cpp | 40 +++-
src/BasicRoutingStateReplicator.h | 3 +-
src/BasicRoutingStateReplicatorApp.cpp | 4 +-
src/BasicRoutingStateReplicatorListener.cpp | 75 ++++++--
src/CMakeLists.txt | 4 +-
src/ConnectBridgedSessionsOperation.cpp | 43 ++++-
src/ConnectBridgedSessionsOperation.h | 19 ++-
...nectBridgedSessionsWithDestinationOperation.cpp | 216 +++++++++++++++++++-
...onnectBridgedSessionsWithDestinationOperation.h | 54 ++++-
src/EndpointRegistry.cpp | 137 ++++++++++++-
src/OperationReplicaCache.cpp | 195 ++++++++++++++++++
src/OperationReplicaCache.h | 62 ++++++
src/RouteSessionOperation.cpp | 188 +++++++++++++-----
src/RouteSessionOperation.h | 65 +++++-
src/RoutingServiceEventPublisher.cpp | 48 -----
src/RoutingServiceEventPublisher.h | 2 -
src/SessionRouter.cpp | 106 +++-------
src/SessionRouter.h | 16 +-
src/SessionRouterOperation.h | 55 ++++-
21 files changed, 1142 insertions(+), 321 deletions(-)
create mode 100644 local-slice/CMakeLists.txt
create mode 100644 src/OperationReplicaCache.cpp
create mode 100644 src/OperationReplicaCache.h
- Log -----------------------------------------------------------------
commit cc3b28f14b514d647fa4933a0a54c6c13705191b
Author: Ken Hunt <ken.hunt at digium.com>
Date: Tue Apr 12 20:34:53 2011 -0500
Big fix for caching the replicated operations.
diff --git a/local-slice/BasicRoutingStateReplicationIf.ice b/local-slice/BasicRoutingStateReplicationIf.ice
index 0a0d740..4b696c1 100644
--- a/local-slice/BasicRoutingStateReplicationIf.ice
+++ b/local-slice/BasicRoutingStateReplicationIf.ice
@@ -35,19 +35,39 @@ module V1
string name;
};
- class RoutingStateItem
+ ///////////////////////////////////////////////////////////////////////
+ // These classes and interfaces implement the replication
+ // pattern of Asterisk SCF.
+
+ ["visitor"] local class RoutingStateItemVisitor
+ {
+ };
+
+ /**
+ * Base class for an item that will be replicated.
+ * The key will be unique among all state items stored in the
+ * state replicator.
+ */
+ ["visitor:RoutingStateItemVisitor"] class RoutingStateItem
{
string key;
};
sequence<RoutingStateItem> RoutingStateItemSeq;
+ /**
+ * Listener interface. Typically implemented by
+ * a routing service in standby mode.
+ */
interface RoutingStateReplicatorListener
{
void stateRemoved(Ice::StringSeq itemKeys);
void stateSet(RoutingStateItemSeq items);
};
+ /**
+ * The state replicator interface.
+ */
interface RoutingStateReplicator
{
void addListener(RoutingStateReplicatorListener *listener);
@@ -58,106 +78,84 @@ module V1
idempotent RoutingStateItemSeq getAllState();
};
- ///////////////////////////////////
- // These state items represent the state transistions of the RouteSession operation.
+ /**
+ * All transactional operations will derive from this.
+ */
+ class OperationStateItem extends RoutingStateItem
+ {
+ string transactionId;
+ };
+
+ ///////////////////////////////////////////////////////////////////////
+ // These state items represent the state transistions
+ // of the RouteSession operation.
- /**
+ /**
* Indicates the RouteSessionOperation started.
- * The key (in the base state item) is the transactionId of this operation + ".START".
+ * The key (in the base state item) is the transactionId of this
+ * operation + RouteSessionOpStartKeyMod
*/
- class RouteSessionOpStart extends RoutingStateItem
+ class RouteSessionOpStart extends OperationStateItem
{
- string transactionId;
AsteriskSCF::SessionCommunications::V1::Session *source;
string destination;
};
-
const string RouteSessionOpStartKeyMod = ".START";
/**
* Indicates the RouteSessionOperation is waiting for an AMI endpoint lookup() reply.
- * The key (in the base state item) is the transactionId of this operation + ".WAITLOOKUP".
+ * The key (in the base state item) is the transactionId of this
+ * operation + RouteSessionOpWaitLookupKeyMod
*/
- class RouteSessionOpWaitLookupState extends RoutingStateItem
+ class RouteSessionOpWaitLookupState extends OperationStateItem
{
- string transactionId;
};
-
const string RouteSessionOpWaitLookupKeyMod = ".WAITLOOKUP";
/**
* Indicates the RouteSessionOperation is going to create the bridge.
- * The key (in the base state item) is the transactionId of this operation + ".BRIDGING".
+ * The key (in the base state item) is the transactionId of this
+ * operation + RouteSessionOpBridgingKeyMod
*/
- class RouteSessionOpBridgingState extends RoutingStateItem
+ class RouteSessionOpBridgingState extends OperationStateItem
{
- string transactionId;
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
};
-
const string RouteSessionOpBridgingKeyMod = ".BRIDGING";
-
- /////////////////////////////////////
- // These state items represent the state transistions of the ConnectBridgedSessionsWithDestination operation.
+ ///////////////////////////////////////////////////////////////////////
+ // These state items represent the state transistions of the
+ // ConnectBridgedSessionsWithDestination operation.
/**
* Indicates the ConnectBridgedSessionsWithDestinationOperation started.
- * The key (in the base state item) is the transactionId of this operation.
+ * The key (in the base state item) is the transactionId of this
+ * operation + ConnectBridgedSessionsWithDestStartKeyMod
*/
- class ConnectBridgedSessionsWithDestinationOpStart
+ class ConnectBridgedSessionsWithDestinationOpStart extends OperationStateItem
{
- string transactionId;
AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
string destination;
};
+ const string ConnectBridgedSessionsWithDestStartKeyMod = ".START";
- class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends RoutingStateItem
+ class ConnectBridgedSessionsWithDestinationOpWaitLookupState extends OperationStateItem
{
- string transactionId;
};
+ const string ConnectBridgedSessionsWithDestWaitLookupKeyMod = ".WAITLOOKUP";
- class ConnectBridgedSessionsWithDestinationOpBridgingState extends RoutingStateItem
+ class ConnectBridgedSessionsWithDestinationOpBridgingState extends OperationStateItem
{
- string transactionId;
AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
};
+ const string ConnectBridgedSessionsWithDestBridgingKeyMod = ".BRIDGING";
- /**
- * Indicates the ConnectBridgedSessionsWithDestinationOperation completed.
- * The key (in the base state item) is the transactionId of this operation.
- */
- class ConnectBridgedSessionsWithDestinationOpComplete extends RoutingStateItem
- {
- string transactionId;
- };
+ ///////////////////////////////////////////////////////////////////////
+ // NOTE: There is no value in replicating the ConnectBridgedSessions
+ // operation. No intermediate results are obtained to cache.
- /////////////////////////////////////
- // These state items represent the state transistions of the ConnectBridgedSessions operation.
- /**
- * Indicates the ConnectBridgedSessionsOperation started.
- * The key (in the base state item) is the transactionId of this operation.
- */
- class ConnectBridgedSessionsOpStart extends RoutingStateItem
- {
- string transactionId;
- AsteriskSCF::SessionCommunications::V1::Session *sessionToReplace;
- AsteriskSCF::SessionCommunications::V1::Session *bridgedSession;
-
- string destination;
- };
-
- /**
- * Indicates the ConnectBridgedSessionsOperation completed.
- * The key (in the base state item) is the transactionId of this operation.
- */
- class ConnectBridgedSessionsOpComplete extends RoutingStateItem
- {
- string transactionId;
- };
-
- ///////////////////////////////////
+ ///////////////////////////////////////////////////////////////////////
// Endpoint locator state items
/**
@@ -178,6 +176,17 @@ module V1
{
};
+ /**
+ * Represents a change of the endpoint locators
+ * managed ids.
+ * The key (in the base state item) is the locatorId.
+ */
+ class EndpointLocatorSetDestIds extends RoutingStateItem
+ {
+ AsteriskSCF::Core::Routing::V1::RegExSeq regExList;
+ };
+
+
}; //module V1
}; //module BasicRouting
}; //module Asterisk SCF
diff --git a/local-slice/CMakeLists.txt b/local-slice/CMakeLists.txt
new file mode 100644
index 0000000..71102ba
--- /dev/null
+++ b/local-slice/CMakeLists.txt
@@ -0,0 +1,4 @@
+# Compile Basic Routing Service Component's own slice
+
+asterisk_scf_slice_include_directories("${CMAKE_SOURCE_DIR}/slice")
+asterisk_scf_compile_slice(BasicRoutingStateReplicationIf.ice lib "Basic Routing State Replicator" BasicRoutingService)
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 0babd69..4b9d3c4 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -36,6 +36,7 @@
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
+#include "OperationReplicaCache.h"
using namespace std;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -65,6 +66,7 @@ public:
: mDone(false),
mInitialized(false),
mRunning(false),
+ mSessionContext(new SessionContext()),
mWorkQueue( new AsteriskSCF::Threading::SimpleWorkQueue("SessionRouterWorkQueue", lg))
{
}
@@ -108,6 +110,8 @@ private:
Discovery::V1::ServiceManagementPrx mComponentServiceManagement;
Discovery::V1::ServiceManagementPrx mSessionRouterManagement;
+ SessionContextPtr mSessionContext;
+
// Our published interfaces.
BasicSessionRouterPtr mSessionRouter;
RoutingServiceAdminPtr mAdminInteface;
@@ -115,6 +119,7 @@ private:
ComponentTestPtr mComponentTest;
AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> mBridgeManager;
RoutingServiceEventPublisherPtr mEventPublisher;
+ boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
EndpointRegistryPtr mEndpointRegistry;
// Replication support
@@ -194,7 +199,13 @@ private:
class ReplicaImpl : public Replica
{
public:
- ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mPaused(false), mActive(active)
+ /**
+ * Constructor.
+ * @param app
+ * @param adapter The adapter is assumed to have been activated.
+ * @param active
+ */
+ ReplicaImpl(BasicRoutingServiceApp &app, Ice::ObjectAdapterPtr adapter, bool active) : mApp(app), mAdapter(adapter), mActive(active)
{
if (mActive)
{
@@ -261,7 +272,6 @@ private:
*/
vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
- bool mPaused;
bool mActive;
BasicRoutingServiceApp& mApp;
@@ -416,7 +426,8 @@ void BasicRoutingServiceApp::locateBridgeManager()
mServiceLocator,
new ServiceLocatorParams(BridgeServiceDiscoveryCategory),
lg);
- mSessionRouter->setBridgeManager(mBridgeManager);
+
+ mSessionContext->bridgeManager = mBridgeManager;
if (!mBridgeManager.isInitialized())
{
@@ -436,12 +447,16 @@ void BasicRoutingServiceApp::locateStateReplicator()
AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> pw(mServiceLocator, replicatorParams, lg);
mStateReplicator = pw;
- mSessionRouter->setStateReplicator(mStateReplicator);
+ pushStateReplicator();
}
+/**
+ * Pushes the state replicator to everything in this components that needs to know.
+ */
void BasicRoutingServiceApp::pushStateReplicator()
{
- mEventPublisher->setStateReplicator(mStateReplicator);
+ mEndpointRegistry->setStateReplicator(mStateReplicator);
+ mSessionContext->stateReplicator = mStateReplicator;
}
/**
@@ -475,9 +490,20 @@ void BasicRoutingServiceApp::initialize()
// as a facet of ComponentService.
mComponentTest = new ComponentTestImpl(*this);
#endif
+ // Create the session context needed to construct operations.
+ SessionContext *rawSessionContext(new SessionContext(mAdapter,
+ mEndpointRegistry,
+ mEventPublisher,
+ mWorkQueue));
+ SessionContextPtr sessionContextPtr(rawSessionContext);
+ mSessionContext = sessionContextPtr;
+
+ // Create the replica cache.
+ boost::shared_ptr<OperationReplicaCache> ptr(new OperationReplicaCache(mSessionContext));
+ mOperationReplicaCache = ptr;
// Create publish the SessionRouter interface.
- SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
+ SessionRouter *rawSessionRouter(new SessionRouter(mSessionContext, mOperationReplicaCache));
BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
mSessionRouter = basicSessionPtr;
mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
@@ -496,7 +522,7 @@ void BasicRoutingServiceApp::initialize()
mAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
// Create and publish our state replicator listener interface.
- mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry);
+ mReplicatorListener = new RoutingStateReplicatorListenerI(mEndpointRegistry, mOperationReplicaCache);
mReplicatorListenerProxy = RoutingStateReplicatorListenerPrx::uncheckedCast(mAdapter->addWithUUID(mReplicatorListener));
mAdapter->activate();
diff --git a/src/BasicRoutingStateReplicator.h b/src/BasicRoutingStateReplicator.h
index 9b267f9..c603bbc 100644
--- a/src/BasicRoutingStateReplicator.h
+++ b/src/BasicRoutingStateReplicator.h
@@ -20,6 +20,7 @@
#include <AsteriskSCF/StateReplicator.h>
#include "BasicRoutingStateReplicationIf.h"
#include "EndpointRegistry.h"
+#include "OperationReplicaCache.h"
namespace AsteriskSCF
{
@@ -40,7 +41,7 @@ typedef IceUtil::Handle<RoutingStateReplicatorI> RoutingStateReplicatorIPtr;
class RoutingStateReplicatorListenerI : public AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorListener
{
public:
- RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry);
+ RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache);
~RoutingStateReplicatorListenerI();
void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
void stateSet(const AsteriskSCF::BasicRoutingService::V1::RoutingStateItemSeq&, const Ice::Current&);
diff --git a/src/BasicRoutingStateReplicatorApp.cpp b/src/BasicRoutingStateReplicatorApp.cpp
index 65f0bc7..4248064 100644
--- a/src/BasicRoutingStateReplicatorApp.cpp
+++ b/src/BasicRoutingStateReplicatorApp.cpp
@@ -168,7 +168,6 @@ void BasicRoutingStateReplicatorService::registerWithServiceLocator(const Ice::C
mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
- // TBD... We may have other interfaces to publish to the Service Locator.
}
catch(...)
{
@@ -202,9 +201,12 @@ void BasicRoutingStateReplicatorService::initialize(const std::string appName, c
getLoggerFactory().setLogOutput(mIceLogger->getLogger());
mAppName = appName;
+
// Create and publish our ComponentService interface support.
mComponentService = new ComponentServiceImpl(*this);
mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
+
+ // Create our instance of the StateReplicator template.
mStateReplicator = new RoutingStateReplicatorI();
mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
diff --git a/src/BasicRoutingStateReplicatorListener.cpp b/src/BasicRoutingStateReplicatorListener.cpp
index d8271fa..53def89 100644
--- a/src/BasicRoutingStateReplicatorListener.cpp
+++ b/src/BasicRoutingStateReplicatorListener.cpp
@@ -20,6 +20,7 @@
#include <boost/shared_ptr.hpp>
#include "BasicRoutingStateReplicator.h"
+#include "OperationReplicaCache.h"
using namespace AsteriskSCF::BasicRoutingService::V1;
@@ -50,8 +51,10 @@ private:
struct RoutingStateReplicatorListenerImpl
{
public:
- RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry)
- : mId(IceUtil::generateUUID())
+ RoutingStateReplicatorListenerImpl(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+ : mId(IceUtil::generateUUID()),
+ mEndpointRegistry(registry),
+ mOperationReplicaCache(opCache)
{
}
@@ -61,30 +64,74 @@ public:
void setStateNoticeImpl(const RoutingStateItemSeq& items, const Ice::Current& current)
{
- for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
- {
- EndpointLocatorAddPtr locatorAdd;
- EndpointLocatorRemovePtr locatorRemove;
- boost::shared_ptr<RoutingStateReplicatorItem> localitem;
+ class visitor : public AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitor
+ {
+ public:
+ visitor(RoutingStateReplicatorListenerImpl *impl) : mImpl(impl)
+ {
+ }
+
+ private:
+ RoutingStateReplicatorListenerImpl *mImpl;
+
+ void visitRoutingStateItem(const ::AsteriskSCF::BasicRoutingService::V1::RoutingStateItemPtr&)
+ {
+
+ }
+
+ void visitRouteSessionOpStart(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr&)
+ {
+ }
+
+ void visitRouteSessionOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr&)
+ {
+ }
+
+ void visitRouteSessionOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr&)
+ {
+ }
- // Depending on the type of state item we apply it differently
- if ((locatorAdd = EndpointLocatorAddPtr::dynamicCast((*item))))
+ void visitConnectBridgedSessionsWithDestinationOpWaitLookupState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr&)
{
- mEndpointRegistry->addEndpointLocator(locatorAdd->key, locatorAdd->regExList, locatorAdd->locator, current);
}
- else if ((locatorRemove = EndpointLocatorRemovePtr::dynamicCast((*item))))
+
+ void visitConnectBridgedSessionsWithDestinationOpBridgingState(const ::AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr&)
+ {
+ }
+
+ void visitEndpointLocatorAdd(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorAddPtr& item)
{
- mEndpointRegistry->removeEndpointLocator(locatorAdd->key, current);
+ mImpl->mEndpointRegistry->addEndpointLocator(item->key, item->regExList, item->locator, ::Ice::Current());
}
+
+ void visitEndpointLocatorRemove(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorRemovePtr& item)
+ {
+ mImpl->mEndpointRegistry->removeEndpointLocator(item->key, ::Ice::Current());
+ }
+
+ void visitEndpointLocatorSetDestIds(const ::AsteriskSCF::BasicRoutingService::V1::EndpointLocatorSetDestIdsPtr& item)
+ {
+ mImpl->mEndpointRegistry->setEndpointLocatorDestinationIds(item->key, item->regExList, ::Ice::Current());
+ }
+
+ }; // end method-local visitor def
+
+ // Create the visitor. Smart pointer will cleanup when this method exits.
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateItemVisitorPtr v = new visitor(this);
+
+ for (RoutingStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
+ {
+ (*item)->visit(v);
}
}
std::string mId;
EndpointRegistryPtr mEndpointRegistry;
+ boost::shared_ptr<OperationReplicaCache> mOperationReplicaCache;
};
-RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry)
- : mImpl(new RoutingStateReplicatorListenerImpl(registry))
+RoutingStateReplicatorListenerI::RoutingStateReplicatorListenerI(const EndpointRegistryPtr& registry, const boost::shared_ptr<OperationReplicaCache>& opCache)
+ : mImpl(new RoutingStateReplicatorListenerImpl(registry, opCache))
{
}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 64b7aec..8024a59 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -36,7 +36,9 @@ asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperat
asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.cpp)
asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
-asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicator.h)
+asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.h)
+asterisk_scf_component_add_file(BasicRoutingService OperationReplicaCache.cpp)
+asterisk_scf_component_add_file(BasicRoutingStateReplicator BasicRoutingStateReplicator.h)
asterisk_scf_component_add_file(BasicRoutingService BasicRoutingStateReplicatorListener.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
index 4e45033..9c637d0 100644
--- a/src/ConnectBridgedSessionsOperation.cpp
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -50,16 +50,18 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
const ::Ice::Current& current,
- const SessionContext& context,
- OperationsManager* const listener)
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr, ConnectBridgedSessionsOp::OperationState>(cb,
context,
listener,
- ConnectBridgedSessionsOp::STATE_CONNECT),
+ ConnectBridgedSessionsOp::STATE_CONNECT,
+ transactionId),
mInitiatorCallback(cb),
mSessionToReplace(sessionToReplace),
mBridgedSession(bridgedSession),
- mIceCurrent(current)
+ mIceCurrent(current)
{
Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
if (it == current.ctx.end())
@@ -72,6 +74,37 @@ ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_Sessi
mStateMachine.addState(ConnectBridgedSessionsOp::STATE_CONNECT, boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this));
}
+/**
+ * Factory method for the operation.
+ */
+ConnectBridgedSessionsOperationPtr ConnectBridgedSessionsOperation::create(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener)
+{
+ // We don't really care about the transaction id on this operation, since we don't replicate this operation.
+ Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
+ std::string transactionId = "unused";
+ if (it == current.ctx.end())
+ {
+ lg(Error) << "ConnectBridgedSessionsOperation() called with no transaction ID set. Ignored." ;
+ transactionId = (it->second);
+ }
+
+ ConnectBridgedSessionsOperationPtr ptr(new ConnectBridgedSessionsOperation(cb,
+ sessionToReplace,
+ bridgedSession,
+ current,
+ context,
+ listener,
+ transactionId));
+
+ return ptr;
+}
+
+
ConnectBridgedSessionsOperation::~ConnectBridgedSessionsOperation()
{
lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
@@ -110,7 +143,7 @@ void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
// Create a listener for the sessions not being replaced to handle early termination.
lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
- SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, preserveSessions));
mListenerManager = listener;
// Get the bridge for the sessions being moved.
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
index fc9364f..2484d5a 100644
--- a/src/ConnectBridgedSessionsOperation.h
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -43,6 +43,10 @@ namespace ConnectBridgedSessionsOp
};
}
+class ConnectBridgedSessionsOperation;
+
+typedef boost::shared_ptr<ConnectBridgedSessionsOperation> ConnectBridgedSessionsOperationPtr;
+
/**
* This class represents an operation to replace one session in a Bridge with sessions
* from another bridge. No routing is actually performed. This operation exists here for consistency.
@@ -57,15 +61,25 @@ class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskS
ConnectBridgedSessionsOp::OperationState>
{
public:
- ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ static ConnectBridgedSessionsOperationPtr create(
+ const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
const Ice::Current& current,
- const SessionContext& context,
+ const SessionContextPtr& context,
OperationsManager* const listener);
virtual ~ConnectBridgedSessionsOperation();
+protected:
+ ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId);
+
private:
/**
* This is a state handler for one of this operation's states.
@@ -81,7 +95,6 @@ private:
::Ice::Current mIceCurrent;
}; // class ConnectBridgedSessionsOperation
-
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
index 0ef9de0..80ca61c 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.cpp
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -18,12 +18,15 @@
#include <AsteriskSCF/logger.h>
+#include "BasicRoutingStateReplicationIf.h"
#include "ConnectBridgedSessionsWithDestinationOperation.h"
using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::System::Logging;
+using namespace ::AsteriskSCF::BasicRoutingService::V1;
+using namespace AsteriskSCF::StateMachine;
namespace
{
@@ -36,11 +39,114 @@ namespace BasicRoutingService
{
/**
- * This operation replaces one session in a Bridge with a new session routable by the
- * destination param. This is a specialization of SessionRouterOperation<T> that handles the
- * connectBridgedSessionsWithDestination() operation. The template parameter T is the type
- * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to
- * the initiator of this operation.
+ * This listener monitors the progress of an operation, and pushes relevant state to the replicator.
+ */
+class ConnectBridgedSessionsWithDestReplicatingListener : public SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener
+{
+public:
+ ConnectBridgedSessionsWithDestReplicatingListener(ConnectBridgedSessionsWithDestinationOperationPtr op,
+ const AsteriskSCF::SmartProxy::SmartProxy<
+ AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
+ : mOperation(op), mReplicator(replicator)
+ {
+ }
+
+ /**
+ * This callback is called just before the execution of a state machine's current state handler.
+ */
+ void stateExecutionStart(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+ {
+ switch(state)
+ {
+ case ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP:
+ {
+ // Push this information to the state replicator.
+ ConnectBridgedSessionsWithDestinationOpStartPtr opStart(new ConnectBridgedSessionsWithDestinationOpStart());
+ opStart->transactionId = mOperation->getTransactionId();
+ opStart->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestStartKeyMod;
+ opStart->sessionToReplace = mOperation->getSessionToReplace();
+ opStart->destination = mOperation->getDestination();
+
+ pushState(opStart);
+ }
+ break;
+
+ case ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS:
+ {
+ // We just completed the entire operation.
+ // Push this information to the state replicator.
+ ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup(new ConnectBridgedSessionsWithDestinationOpWaitLookupState());
+ waitLookup->transactionId = mOperation->getTransactionId();
+ waitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
+
+ pushState(waitLookup);
+ }
+ break;
+
+ case ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING:
+ {
+ // We just completed the entire operation.
+ // Push this information to the state replicator.
+ ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridgeOp(new ConnectBridgedSessionsWithDestinationOpBridgingState());
+ bridgeOp->transactionId = mOperation->getTransactionId();
+ bridgeOp->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
+ bridgeOp->endpoints = mOperation->getLookupResult();
+
+ pushState(bridgeOp);
+ }
+ break;
+ }
+ }
+
+ void pushState(RoutingStateItemPtr item)
+ {
+ RoutingStateItemSeq setItems;
+
+ setItems.push_back(item);
+ mReplicator->setState(setItems);
+
+ // Cache the keys of all pushed items.
+ mReplicatedStateKeys.push_back(item->key);
+ }
+
+ /**
+ * This callback is called just before the execution of a state machine's current state handler.
+ */
+ void stateExecutionComplete(ConnectBridgedSessionsWithDestinationOp::OperationState state)
+ {
+ }
+
+ /**
+ * This method is sent when the operation state machine is shutting down.
+ */
+ void shutdown()
+ {
+ // We just completed the entire operation.
+ // Remove the items that represented this operation's state transitions from the state replicator.
+ mReplicator->removeState(mReplicatedStateKeys);
+
+ // Release our reference to the operation.
+ mOperation.reset();
+ }
+
+ /**
+ * This is called when a state transition is occuring.
+ */
+ void stateTransition(ConnectBridgedSessionsWithDestinationOp::OperationState oldState,
+ ConnectBridgedSessionsWithDestinationOp::OperationState newState)
+ {
+ }
+
+private:
+ Ice::StringSeq mReplicatedStateKeys;
+ ConnectBridgedSessionsWithDestinationOperationPtr mOperation;
+ AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
+
+}; // end ConnectBridgedSessionsWithDestReplicatingListener
+
+/**
+ * Primary constructor. This class represents an operation that replaces one session in a
+ * Bridge with a new session routable by the destination param.
*
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
@@ -48,29 +154,78 @@ ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestin
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
const ::std::string& destination,
const ::Ice::Current& current,
- const SessionContext& context,
- OperationsManager* const listener)
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(cb,
context,
listener,
- ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP),
+ ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP,
+ transactionId),
mInitiatorCallback(cb),
mSessionToReplace(sessionToReplace),
mDestination(destination),
mIceCurrent(current)
{
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
+ mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+}
+
+/**
+ * This is the factory method for this operation.
+ */
+ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::create(
+ const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener)
+{
Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
if (it == current.ctx.end())
{
lg(Error) << "ConnectBridgedSessionsWithDestinationOperation() called with no transaction ID set!" ;
throw InvalidParamsException();
}
+ std::string transactionId = (it->second);
- mTransactionId = (it->second);
+ ConnectBridgedSessionsWithDestinationOperationPtr op( new ConnectBridgedSessionsWithDestinationOperation(cb,
+ sessionToReplace,
+ destination,
+ current,
+ context,
+ listener,
+ transactionId) );
+
+ boost::shared_ptr<SimpleStateMachine<ConnectBridgedSessionsWithDestinationOp::OperationState>::StateMachineListener> replicatingListener(new ConnectBridgedSessionsWithDestReplicatingListener(op, context->stateReplicator));
+ op->addStateMachineListener(replicatingListener);
+
+ return op;
+}
+
+/**
+ * Constructor to service replicas.
+ */
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context)
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr, ConnectBridgedSessionsWithDestinationOp::OperationState>(
+ context,
+ ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP
+ )
+{
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::waitOnLookupState, this));
mStateMachine.addState(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING, boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this));
+}
+/**
+ * Factory for replica objects.
+ */
+ConnectBridgedSessionsWithDestinationOperationPtr ConnectBridgedSessionsWithDestinationOperation::createReplica(const SessionContextPtr& context)
+{
+ ConnectBridgedSessionsWithDestinationOperationPtr op(new ConnectBridgedSessionsWithDestinationOperation(context));
+ return op;
}
ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDestinationOperation()
@@ -78,6 +233,47 @@ ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDesti
lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
}
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem)
+{
+ ConnectBridgedSessionsWithDestinationOpStartPtr start;
+ ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr waitLookup;
+ ConnectBridgedSessionsWithDestinationOpBridgingStatePtr bridging;
+
+ if ((start = ConnectBridgedSessionsWithDestinationOpStartPtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(start);
+ }
+ else if ((waitLookup = ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(waitLookup);
+ }
+ else if ((bridging = ConnectBridgedSessionsWithDestinationOpBridgingStatePtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(bridging);
+ }
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr item)
+{
+ mSessionToReplace = item->sessionToReplace;
+ mDestination = item->destination;
+ mTransactionId = item->transactionId;
+
+ mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_LOOKUP);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr item)
+{
+ mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_WAIT_LOOKUP_RESULTS);
+}
+
+void ConnectBridgedSessionsWithDestinationOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr item)
+{
+ mLookupResult = item->endpoints;
+
+ mReplicatedStates.push_back(ConnectBridgedSessionsWithDestinationOp::STATE_BRIDGING);
+}
+
/**
* This is a state handler for one of this operation's states.
*/
@@ -109,7 +305,7 @@ void ConnectBridgedSessionsWithDestinationOperation::lookupState()
// The wrapper we're using will remove the listener and free it when
// this method is left.
lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
- SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, mSessionToReplace));
mListenerManager = listener;
// Route the destination
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
index 61557a2..b1d35b1 100644
--- a/src/ConnectBridgedSessionsWithDestinationOperation.h
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -46,6 +46,10 @@ namespace ConnectBridgedSessionsWithDestinationOp
};
}
+class ConnectBridgedSessionsWithDestinationOperation;
+
+typedef boost::shared_ptr<ConnectBridgedSessionsWithDestinationOperation> ConnectBridgedSessionsWithDestinationOperationPtr;
+
/**
* This operation replaces one session in a Bridge with a new session routable by the
* destination param. This is a specialization of SessionRouterOperation<T> that handles the
@@ -59,17 +63,51 @@ namespace ConnectBridgedSessionsWithDestinationOp
ConnectBridgedSessionsWithDestinationOp::OperationState>
{
public:
- ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
- const std::string& destination,
- const ::Ice::Current& current,
- const SessionContext& context,
- OperationsManager* const listener);
+ /**
+ * Factory method for the class. This method creates an active operation.
+ */
+ static ConnectBridgedSessionsWithDestinationOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
virtual ~ConnectBridgedSessionsWithDestinationOperation();
+ AsteriskSCF::SessionCommunications::V1::SessionPrx getSessionToReplace() {return mSessionToReplace;}
+
+ std::string getDestination() {return mDestination;}
+
+ /**
+ * Factory method for replica objects.
+ */
+ static ConnectBridgedSessionsWithDestinationOperationPtr createReplica(const SessionContextPtr& context);
+
+ /**
+ * Update a replica object with new state information.
+ */
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+
+protected:
+ ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId);
+
+ // Constructor to service replicas.
+ ConnectBridgedSessionsWithDestinationOperation(const SessionContextPtr& context);
+
private:
+
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpStartPtr stateItem);
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpWaitLookupStatePtr stateItem);
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::ConnectBridgedSessionsWithDestinationOpBridgingStatePtr stateItem);
+
/**
* This is a state handler for one of this operation's states.
*/
@@ -101,9 +139,9 @@ private:
AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
-}; // class ConnectBridgedSessionsWithDestinationOperation
+ std::vector<ConnectBridgedSessionsWithDestinationOp::OperationState> mReplicatedStates;
+}; // class ConnectBridgedSessionsWithDestinationOperation
-
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 34eb1a2..3c1fe6e 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -26,11 +26,13 @@
#include "RoutingServiceEventPublisher.h"
#include "ScriptProcessor.h"
+using namespace ::std;
using namespace ::AsteriskSCF::Core::Endpoint::V1;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
using namespace ::AsteriskSCF::Core::Routing::V1::Event;
-using namespace ::std;
+using namespace ::AsteriskSCF::BasicRoutingService::V1;
+using namespace ::AsteriskSCF::SmartProxy;
namespace
{
@@ -41,7 +43,6 @@ namespace AsteriskSCF
{
namespace BasicRoutingService
{
-
struct RegisteredLocator
{
public:
@@ -115,8 +116,119 @@ public:
destination.insert(mEndpointLocatorMap.begin(), mEndpointLocatorMap.end());
}
+ /**
+ * Forwards the result of processing a remove endpoint operation.
+ */
+ void forwardRemoveEndpointLocator(const std::string& locatorId, Event::OperationResult result)
+ {
+ if (!mActive)
+ {
+ return;
+ }
+
+ // Forward to state replicator
+ if (result == Event::SUCCESS)
+ {
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
+
+ EndpointLocatorRemovePtr removeEndpointItem(new EndpointLocatorRemove());
+ removeEndpointItem->key = locatorId;
+
+ setItems.push_back(removeEndpointItem);
+
+ mStateReplicator->setState(setItems);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Debug) << "EndpointRegistry unable to replicate removeEndpointLocator(): " << e.what();
+ }
+ }
+
+ // Forward to event publisher.
+ mEventPublisher->removeEndpointLocatorEvent(locatorId, result);
+ }
+
+ /**
+ * Forwards the result of processing a remove endpoint operation.
+ */
+ void forwardAddEndpointLocator(const std::string& locatorId,
+ const RegExSeq& regexList,
+ const EndpointLocatorPrx& locator,
+ Event::OperationResult result)
+ {
+ if (!mActive)
+ {
+ return;
+ }
+
+ // Forward to state replicator
+ if (result == Event::SUCCESS)
+ {
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
+
+ EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
+ addEndpointItem->key = locatorId;
+ addEndpointItem->locator = locator;
+ addEndpointItem->regExList = regexList;
+
+ setItems.push_back(addEndpointItem);
+
+ mStateReplicator->setState(setItems);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
+ }
+ }
+
+ // Forward to event publisher.
+ mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, result);
+ }
+
+ void forwardEndpointLocatorDestIdChange(const std::string& locatorId,
+ const RegExSeq& regexList,
+ OperationResult result)
+ {
+ if (!mActive)
+ {
+ return;
+ }
+
+ // Forward to state replicator
+ if (result == Event::SUCCESS)
+ {
+ try
+ {
+ // Push this information to the state replicator.
+ RoutingStateItemSeq setItems;
+
+ EndpointLocatorSetDestIdsPtr setDestIdsItem(new EndpointLocatorSetDestIds());
+ setDestIdsItem->key = locatorId;
+ setDestIdsItem->regExList = regexList;
+
+ setItems.push_back(setDestIdsItem);
+
+ mStateReplicator->setState(setItems);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Debug) << "EndpointRegistry unable to replicate addEndpointLocator(): " << e.what();
+ }
+ }
+
+ mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regexList, Event::FAILURE);
+ }
+
boost::shared_mutex mLock;
+ AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx> mStateReplicator;
+
boost::shared_ptr<ScriptProcessor> mScriptProcessor;
EndpointLocatorMap mEndpointLocatorMap;
const RoutingEventsPtr mEventPublisher;
@@ -231,6 +343,11 @@ void EndpointRegistry::setActive(bool isActive)
mImpl->mActive = isActive;
}
+void EndpointRegistry::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<RoutingStateReplicatorPrx>& replicator)
+{
+ mImpl->mStateReplicator = replicator;
+}
+
/**
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
@@ -341,13 +458,13 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
RegisteredLocator newLocator(locator, regexList);
mImpl->insertLocatorMapItem(locatorId, newLocator);
- mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::SUCCESS);
+ mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::SUCCESS);
}
catch (...)
{
lg(Error) << "Exception adding EndpointLocator.";
- mImpl->mEventPublisher->addEndpointLocatorEvent(locatorId, regexList, locator, Event::FAILURE);
+ mImpl->forwardAddEndpointLocator(locatorId, regexList, locator, Event::FAILURE);
return;
}
}
@@ -368,19 +485,19 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
if (!exists)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
- mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
+ mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
return;
}
mImpl->eraseLocatorMapItem(locatorId);
- mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::SUCCESS);
+ mImpl->forwardRemoveEndpointLocator(locatorId, Event::SUCCESS);
lg(Info) << "Removed Endpoint Locator with Id = " << locatorId;
}
catch(const std::exception &e)
{
- mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
+ mImpl->forwardRemoveEndpointLocator(locatorId, Event::FAILURE);
lg(Error) << e.what();
}
}
@@ -401,18 +518,18 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
if (!exists)
{
- mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
+ mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::FAILURE);
throw DestinationNotFoundException(locatorId);
}
// Replace the regular expression.
existing->second.setRegEx(regExList);
- mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::SUCCESS);
+ mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::SUCCESS);
}
catch(const std::exception &e)
{
- mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
+ mImpl->forwardEndpointLocatorDestIdChange(locatorId, regExList, Event::FAILURE);
lg(Error) << "Exception modifying the destination specifications for EndpointLocator " << locatorId;
lg(Error) << " - " << e.what();
}
diff --git a/src/OperationReplicaCache.cpp b/src/OperationReplicaCache.cpp
new file mode 100644
index 0000000..3045ab8
--- /dev/null
+++ b/src/OperationReplicaCache.cpp
@@ -0,0 +1,195 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <AsteriskSCF/Threading/WorkQueue.h>
+
+#include "OperationReplicaCache.h"
+#include "SessionRouterOperation.h"
+
+using namespace AsteriskSCF::Threading;
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+typedef std::map<std::string, AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr> StateItemMapType;
+
+/**
+ * For each transaction id, we're going to cache the all the state items for the operation.
+ * The reason for this is that we don't want to rely on the order in which we will receive the state updates.
+ * This class is used to hold all the state updates for a given operation.
+ *
+ * When an item is called for from the cache, we'll apply all the available state updates we have,
+ * in order, up to the highest we have (without missing a state, since each state depends on the results
+ * of the previous.)
+ *
+ * NOTE: TBD...This class needs some templatized functions to avoid all the duplication in the case statements.
+ */
+template<typename T>
+struct OperationReplicaItem
+{
+public:
+ T mOperation;
+ StateItemMapType mItems;
+};
+
+typedef std::map<std::string, OperationReplicaItem<RouteSessionOperationPtr> > RouteSessionMapType;
+typedef std::map<std::string, OperationReplicaItem<ConnectBridgedSessionsWithDestinationOperationPtr> > ConnectBridgeWithDestMapType;
+
+class OperationReplicaCachePriv
+{
+public:
+
+ OperationReplicaCachePriv(const SessionContextPtr& sessionContext)
+ : mSessionContext(sessionContext)
+ {
+ }
+
+ SessionContextPtr mSessionContext;
+ RouteSessionMapType routeSessionReplicas;
+ ConnectBridgeWithDestMapType connectBridgedWithDestReplicas;
+};
+
+OperationReplicaCache::OperationReplicaCache(const SessionContextPtr& sessionContext)
+ : mPriv(new OperationReplicaCachePriv(sessionContext))
+{
+}
+
+
+void OperationReplicaCache::cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item)
+{
+ switch(type)
+ {
+ case ROUTE_SESSION_OP:
+ {
+ // See if this transaction is in the cache.
+ RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(item->transactionId);
+ if (i == mPriv->routeSessionReplicas.end())
+ {
+ // Add an entry to the cache.
+ OperationReplicaItem<RouteSessionOperationPtr> replica;
+ mPriv->routeSessionReplicas[item->transactionId] = replica;
+
+ i = mPriv->routeSessionReplicas.find(item->transactionId);
+ }
+ (*i).second.mItems[item->key] = item;
+
+ // If we haven't created the replica yet, do so now.
+ if ((*i).second.mOperation.get() == 0)
+ {
+ RouteSessionOperationPtr work(RouteSessionOperation::createReplica(mPriv->mSessionContext));
+ (*i).second.mOperation = work;
+ }
+
+ // Update the replicated object with newest state update.
+ (*i).second.mOperation->reflectUpdate(item);
+ }
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_OP:
+ // Not replicating this type.
+ break;
+ }
+}
+
+bool OperationReplicaCache::fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref)
+{
+ switch(type)
+ {
+ case ROUTE_SESSION_OP:
+ {
+ if (mPriv->routeSessionReplicas.empty())
+ {
+ return false;
+ }
+
+ RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+ if (i == mPriv->routeSessionReplicas.end())
+ {
+ return false;
+ }
+
+ ref = (*i).second.mOperation;
+ mPriv->routeSessionReplicas.erase(i);
+
+ return true;
+ }
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ {
+ if (mPriv->connectBridgedWithDestReplicas.empty())
+ {
+ return false;
+ }
+ }
+ break;
+ }
+
+ return false;
+}
+
+void OperationReplicaCache::dropRouteSessionOperation(OperationType type, std::string transactionId)
+{
+ switch(type)
+ {
+ case ROUTE_SESSION_OP:
+ {
+ RouteSessionMapType::iterator i = mPriv->routeSessionReplicas.find(transactionId);
+ if (i != mPriv->routeSessionReplicas.end())
+ {
+ mPriv->routeSessionReplicas.erase(i);
+ }
+ }
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ {
+ ConnectBridgeWithDestMapType::iterator i = mPriv->connectBridgedWithDestReplicas.find(transactionId);
+ if (i != mPriv->connectBridgedWithDestReplicas.end())
+ {
+ mPriv->connectBridgedWithDestReplicas.erase(i);
+ }
+ }
+ break;
+ }
+}
+
+void OperationReplicaCache::clearCache(OperationType type)
+{
+ switch(type)
+ {
+ case ROUTE_SESSION_OP:
+ mPriv->routeSessionReplicas.clear();
+ break;
+
+ case CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP:
+ mPriv->connectBridgedWithDestReplicas.clear();
+ break;
+ }
+}
+
+void OperationReplicaCache::clearCache()
+{
+ mPriv->routeSessionReplicas.clear();
+ mPriv->connectBridgedWithDestReplicas.clear();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/OperationReplicaCache.h b/src/OperationReplicaCache.h
new file mode 100644
index 0000000..ff89752
--- /dev/null
+++ b/src/OperationReplicaCache.h
@@ -0,0 +1,62 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/Threading/WorkQueue.h>
+
+#include "BasicRoutingStateReplicationIf.h"
+#include "SessionRouter.h"
+#include "RouteSessionOperation.h"
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+#include "ConnectBridgedSessionsOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+enum OperationType
+{
+ ROUTE_SESSION_OP = 0,
+ CONNECT_BRIDGED_SESSIONS_WITH_DEST_OP = 1,
+ CONNECT_BRIDGED_SESSIONS_OP = 2
+};
+
+class OperationReplicaCachePriv;
+class SessionContext;
+
+class OperationReplicaCache
+{
+public:
+ OperationReplicaCache(const boost::shared_ptr<SessionContext>& sessionContext);
+
+ void cacheOperation(OperationType type, const AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr& item);
+ bool fetchRouteSessionOperation(OperationType type, std::string transactionId, AsteriskSCF::Threading::WorkPtr& ref);
+ void dropRouteSessionOperation(OperationType type, std::string transactionId);
+
+ void clearCache(OperationType type);
+ void clearCache();
+
+private:
+ boost::shared_ptr<OperationReplicaCachePriv> mPriv;
+};
+typedef boost::shared_ptr<OperationReplicaCache> OperationReplicaCachePtr;
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
index 127d3d8..97d60d3 100644
--- a/src/RouteSessionOperation.cpp
+++ b/src/RouteSessionOperation.cpp
@@ -39,12 +39,12 @@ namespace BasicRoutingService
{
/**
- * This listener monitors the progress of an operation, and push
+ * This listener monitors the progress of an operation, and pushes relevant state to the replicator.
*/
-class RouteSessionReplicatingListener : public SimpleStateMachine<RouteSessionOp::OperationState, boost::function<void ()> >::StateMachineListener
+class RouteSessionReplicatingListener : public SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener
{
public:
- RouteSessionReplicatingListener(RouteSessionOperation *op,
+ RouteSessionReplicatingListener(RouteSessionOperationPtr op,
const AsteriskSCF::SmartProxy::SmartProxy<
AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
: mOperation(op), mReplicator(replicator)
@@ -60,30 +60,26 @@ public:
{
case RouteSessionOp::STATE_LOOKUP:
{
+ // This is the initial state. All the state of interest is what's been passed in.
// Push this information to the state replicator.
- RoutingStateItemSeq setItems;
-
RouteSessionOpStartPtr routeSessionOpStart(new RouteSessionOpStart());
+ routeSessionOpStart->transactionId = mOperation->getTransactionId();
routeSessionOpStart->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartKeyMod;
routeSessionOpStart->source = mOperation->getSource();
routeSessionOpStart->destination = mOperation->getDestination();
- setItems.push_back(routeSessionOpStart);
- mReplicator->setState(setItems);
+ pushState(routeSessionOpStart);
}
break;
case RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS:
{
- // We just completed the entire operation.
- // Push this information to the state replicator.
- RoutingStateItemSeq setItems;
-
+ // We've sent out our lookup request via AMI.
RouteSessionOpWaitLookupStatePtr routeSessionOpWaitLookup(new RouteSessionOpWaitLookupState());
+ routeSessionOpWaitLookup->transactionId = mOperation->getTransactionId();
routeSessionOpWaitLookup->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod;
- setItems.push_back(routeSessionOpWaitLookup);
- mReplicator->setState(setItems);
+ pushState(routeSessionOpWaitLookup);
}
break;
@@ -91,40 +87,47 @@ public:
{
// We just completed the entire operation.
// Push this information to the state replicator.
- RoutingStateItemSeq setItems;
-
RouteSessionOpBridgingStatePtr routeSessionOpBridging(new RouteSessionOpBridgingState());
+ routeSessionOpBridging->transactionId = mOperation->getTransactionId();
routeSessionOpBridging->key = mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod;
routeSessionOpBridging->endpoints = mOperation->getLookupResult();
- setItems.push_back(routeSessionOpBridging);
- mReplicator->setState(setItems);
+ pushState(routeSessionOpBridging);
}
break;
}
}
+ void pushState(RoutingStateItemPtr item)
+ {
+ RoutingStateItemSeq setItems;
+
+ setItems.push_back(item);
+ mReplicator->setState(setItems);
+
+ // Cache the keys of all pushed items.
+ mReplicatedStateKeys.push_back(item->key);
+ }
+
/**
- * This callback is called just before the execution of a state machine's current state handler.
+ * This callback is called just after the execution of a state machine's current state handler.
*/
void stateExecutionComplete(RouteSessionOp::OperationState state)
{
- switch(state)
- {
- case RouteSessionOp::STATE_BRIDGING:
- // We just completed the entire operation.
- // Remove the items that represented this operation's state transitions.
- Ice::StringSeq stateKeys;
-
- stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartKeyMod);
- stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupKeyMod);
- stateKeys.push_back(mOperation->getTransactionId() + AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingKeyMod);
+ }
- mReplicator->removeState(stateKeys);
- break;
- }
+ /**
+ * This method is sent when the operation state machine is shutting down.
+ */
+ void shutdown()
+ {
+ // We just completed the entire operation.
+ // Remove the items that represented this operation's state transitions.
+ mReplicator->removeState(mReplicatedStateKeys);
+ // Release our reference to the operation.
+ mOperation.reset();
}
/**
@@ -135,15 +138,14 @@ public:
}
private:
- RouteSessionOperation* mOperation;
+ Ice::StringSeq mReplicatedStateKeys;
+ RouteSessionOperationPtr mOperation;
AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mReplicator;
};
/**
- * This is a specialization of the SessionRouterOperation<T> to handle the
- * routeSession() operation. The template parameter T is the type of the routeSession()
- * AMD callback handler to allow this object to send results to the initiator of this
- * operation.
+ * Constructor. This class is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation.
*
* This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
@@ -152,33 +154,121 @@ RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessio
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
const ::std::string& destination,
const ::Ice::Current& current,
- const SessionContext& context,
- OperationsManager* const listener)
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId)
: SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(cb,
context,
listener,
- RouteSessionOp::STATE_LOOKUP),
+ RouteSessionOp::STATE_LOOKUP,
+ transactionId),
mInitiatorCallback(cb),
mSource(source),
mDestination(destination),
mIceCurrent(current)
{
+ // Configure the state machine with state handlers.
+ mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
+ mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
+}
+
+/**
+ * This is the factory method for RouteSessionOperation.
+ */
+RouteSessionOperationPtr RouteSessionOperation::create(const AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener)
+{
Ice::Context::const_iterator it = current.ctx.find(::AsteriskSCF::SessionCommunications::V1::TransactionKey);
if (it == current.ctx.end())
{
- lg(Error) << "RouteSessionOperation() called with no transaction ID set!";
+ lg(Error) << "RouteSessionOperation() called with no transaction ID set in context. Unable to replicate operation.";
throw InvalidParamsException();
}
- mTransactionId = (it->second);
+ std::string transactionId = (it->second);
+
+ RouteSessionOperationPtr op (new RouteSessionOperation(cb,
+ source,
+ destination,
+ current,
+ context,
+ listener,
+ transactionId));
- boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState, boost::function<void ()> >::StateMachineListener > ptr(new RouteSessionReplicatingListener(this, context.stateReplicator));
- mStateMachine.addListener(ptr);
+ // Create a listener for pushing replication data.
+ boost::shared_ptr<SimpleStateMachine<RouteSessionOp::OperationState>::StateMachineListener> replicatingListener(new RouteSessionReplicatingListener(op, context->stateReplicator));
+ op->addStateMachineListener(replicatingListener);
+
+ return op;
+}
+/**
+ * Alternate constructor for replicas.
+ */
+RouteSessionOperation::RouteSessionOperation(const SessionContextPtr& sessionContext)
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr, RouteSessionOp::OperationState>(sessionContext, RouteSessionOp::STATE_LOOKUP)
+{
+ // Configure the state machine with state handlers.
mStateMachine.addState(RouteSessionOp::STATE_LOOKUP, boost::bind(&RouteSessionOperation::lookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS, boost::bind(&RouteSessionOperation::waitOnLookupState, this));
mStateMachine.addState(RouteSessionOp::STATE_BRIDGING, boost::bind(&RouteSessionOperation::establishBridgeState, this));
}
+/**
+ * This is the factory method for creating a replica of a RouteSessionOperation.
+ */
+RouteSessionOperationPtr RouteSessionOperation::createReplica(const SessionContextPtr& sessionContext)
+{
+ RouteSessionOperationPtr op (new RouteSessionOperation(sessionContext));
+
+ return op;
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem)
+{
+ RouteSessionOpStartPtr start;
+ RouteSessionOpWaitLookupStatePtr waitLookup;
+ RouteSessionOpBridgingStatePtr bridging;
+
+ if ((start = RouteSessionOpStartPtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(start);
+ }
+ else if ((waitLookup = RouteSessionOpWaitLookupStatePtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(waitLookup);
+ }
+ else if ((bridging = RouteSessionOpBridgingStatePtr::dynamicCast(stateItem)) != 0)
+ {
+ reflectUpdate(bridging);
+ }
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr item)
+{
+ mSource = item->source;
+ mDestination = item->destination;
+ mTransactionId = item->transactionId;
+
+ mReplicatedStates.push_back(RouteSessionOp::STATE_LOOKUP);
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item)
+{
+ mReplicatedStates.push_back(RouteSessionOp::STATE_WAIT_LOOKUP_RESULTS);
+}
+
+void RouteSessionOperation::reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item)
+{
+ mLookupResult = item->endpoints;
+
+ mReplicatedStates.push_back(RouteSessionOp::STATE_BRIDGING);
+}
+
RouteSessionOperation::~RouteSessionOperation()
{
lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
@@ -193,7 +283,7 @@ void RouteSessionOperation::lookupState()
{
lg(Debug) << "routeSession() entered with destination " << mDestination ;
- if (!mSessionContext.bridgeManager.initializeOnce())
+ if (!mSessionContext->bridgeManager.initializeOnce())
{
lg(Error) << "No proxy to BridgeManager. "
"Make sure all services are running.";
@@ -205,7 +295,7 @@ void RouteSessionOperation::lookupState()
// Create a listener for the source to handle early termination.
// The wrapper we're using will remove the listener and free it when
// this method is left.
- SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext->adapter, mSource));
mListenerManager = listener;
// Set the state handler to exectute once we've looked up our endpoints.
@@ -224,8 +314,9 @@ void RouteSessionOperation::lookupState()
*/
void RouteSessionOperation::waitOnLookupState()
{
- if (mFinished)
+ if (mFinished)
{
+ // An exception must have terminated this operation.
return;
}
@@ -285,7 +376,7 @@ void RouteSessionOperation::establishBridgeState()
bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
lg(Debug) << "routeSession(): Creating bridge.";
- bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+ bridge = mSessionContext->bridgeManager->createBridge(bridgedSessions, 0);
}
catch (const Ice::Exception &e)
{
@@ -312,6 +403,9 @@ void RouteSessionOperation::establishBridgeState()
// This operation is complete. Send AMD responses.
finishAndSendResult();
+
+ // Shutdown the state machine.
+ mStateMachine.shutdown();
}
} // end BasicRoutingService
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
index 2e83ae5..921f429 100644
--- a/src/RouteSessionOperation.h
+++ b/src/RouteSessionOperation.h
@@ -20,6 +20,8 @@
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include "BasicRoutingStateReplicationIf.h"
+
#include "SessionRouterOperation.h"
namespace AsteriskSCF
@@ -44,11 +46,12 @@ namespace RouteSessionOp
};
}
+class RouteSessionOperation;
+typedef boost::shared_ptr<RouteSessionOperation> RouteSessionOperationPtr;
+
/**
- * This is a specialization of the SessionRouterOperation<T> to handle the
- * routeSession() operation. The template parameter T is the type of the routeSession()
- * AMD callback handler to allow this object to send results to the initiator of this
- * operation.
+ * This is a specialization of the SessionRouterOperation<T,S> to handle the
+ * routeSession() operation.
*
* This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
@@ -58,12 +61,15 @@ class RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::Sessio
RouteSessionOp::OperationState>
{
public:
- RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
- const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const ::std::string& destination,
- const ::Ice::Current& current,
- const SessionContext& context,
- OperationsManager* const listener);
+ /**
+ * Factory method for the class. This method creates an active routing operation.
+ */
+ static RouteSessionOperationPtr create(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener);
virtual ~RouteSessionOperation();
@@ -71,7 +77,44 @@ public:
std::string getDestination() {return mDestination;}
+ /**
+ * Factory method for replica objects.
+ */
+ static RouteSessionOperationPtr createReplica(const SessionContextPtr& context);
+
+ /**
+ * Update a replica object with new state information.
+ */
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::OperationStateItemPtr stateItem);
+
+protected:
+ // Normal constructor
+ RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContextPtr& context,
+ OperationsManager* const listener,
+ std::string transactionId);
+
+ // Constructor for replicas.
+ RouteSessionOperation(const SessionContextPtr& context);
+
private:
+ /**
+ * Update a replica object with new state information of a specific type.
+ */
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpStartPtr item);
+
+ /**
+ * Update a replica object with new state information of a specific type.
+ */
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpWaitLookupStatePtr item);
+
+ /**
+ * Update a replica object with new state information of a specific type.
+ */
+ void reflectUpdate(AsteriskSCF::BasicRoutingService::V1::RouteSessionOpBridgingStatePtr item);
/**
* This is a state handler for one of this operation's states.
@@ -104,6 +147,8 @@ private:
std::string mDestination;
::Ice::Current mIceCurrent;
+ std::vector<RouteSessionOp::OperationState> mReplicatedStates;
+
}; // class RouteSessionOperation
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index f6163a8..3863b03 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -143,8 +143,6 @@ public:
}
public:
- AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx> mStateReplicator;
-
Event::RoutingEventsPrx mEventTopic; // Using one-way proxy.
boost::mutex mLock;
@@ -152,7 +150,6 @@ private:
Ice::ObjectAdapterPtr mAdapter;
bool mInitialized;
bool mActive;
-
};
/**
@@ -168,11 +165,6 @@ void RoutingServiceEventPublisher::setActive(bool val)
mImpl->setActive(val);
}
-void RoutingServiceEventPublisher::setStateReplicator(const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::BasicRoutingService::V1::RoutingStateReplicatorPrx>& replicator)
-{
- mImpl->mStateReplicator = replicator;
-}
-
/**
* Send a message to the service's event topic to report a lookup event.
*/
@@ -204,27 +196,6 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (mImpl->isActive() && result == Event::SUCCESS)
- {
- try
- {
- // Push this information to the state replicator.
- RoutingStateItemSeq setItems;
-
- EndpointLocatorAddPtr addEndpointItem(new EndpointLocatorAdd());
- addEndpointItem->key = locatorId;
- addEndpointItem->locator = locator;
- addEndpointItem->regExList = regexList;
... 409 lines suppressed ...
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list