[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Jan 20 18:27:41 CST 2011
branch "route_async" has been updated
via 0f1310bcaa67642b0bc73bea931dfab7a673c49c (commit)
from 749bfb7746b6be3337c6eb750c129b35993c4297 (commit)
Summary of changes:
src/BasicRoutingServiceApp.cpp | 2 +-
src/CMakeLists.txt | 17 +-
src/ConnectBridgedSessionsOperation.cpp | 153 +++
src/ConnectBridgedSessionsOperation.h | 68 ++
...nectBridgedSessionsWithDestinationOperation.cpp | 188 ++++
...onnectBridgedSessionsWithDestinationOperation.h | 79 ++
src/EndpointRegistry.cpp | 114 +--
src/RouteSessionOperation.cpp | 184 ++++
src/RouteSessionOperation.h | 78 ++
src/RoutingServiceEventPublisher.cpp | 49 +-
src/SessionListener.cpp | 255 +++++
src/SessionListener.h | 117 ++
src/SessionRouter.cpp | 1147 +-------------------
src/SessionRouter.h | 6 +-
src/SessionRouterOperation.cpp | 270 +++++
src/SessionRouterOperation.h | 325 ++++++
src/SimpleWorkQueue.cpp | 275 -----
src/SimpleWorkQueue.h | 58 -
src/WorkQueue.h | 85 --
19 files changed, 1799 insertions(+), 1671 deletions(-)
create mode 100644 src/ConnectBridgedSessionsOperation.cpp
create mode 100644 src/ConnectBridgedSessionsOperation.h
create mode 100644 src/ConnectBridgedSessionsWithDestinationOperation.cpp
create mode 100644 src/ConnectBridgedSessionsWithDestinationOperation.h
create mode 100644 src/RouteSessionOperation.cpp
create mode 100644 src/RouteSessionOperation.h
create mode 100644 src/SessionListener.cpp
create mode 100644 src/SessionListener.h
create mode 100644 src/SessionRouterOperation.cpp
create mode 100644 src/SessionRouterOperation.h
delete mode 100644 src/SimpleWorkQueue.cpp
delete mode 100644 src/SimpleWorkQueue.h
delete mode 100644 src/WorkQueue.h
- Log -----------------------------------------------------------------
commit 0f1310bcaa67642b0bc73bea931dfab7a673c49c
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Jan 20 18:17:37 2011 -0600
Incorporation of review feedback.
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index f246a22..5a5bc78 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,7 +30,7 @@
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
-#include "SimpleWorkQueue.h"
+#include "AsteriskSCF/WorkQueue/SimpleWorkQueue.h"
#include "IceLogger.h"
#include "logger.h"
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1f6833a..00fe756 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,9 +19,17 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
-asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
-asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
-asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
+
+asterisk_scf_component_add_file(BasicRoutingService SessionRouterOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService SessionRouterOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService RouteSessionOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService RouteSessionOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsWithDestinationOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsWithDestinationOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.h)
+asterisk_scf_component_add_file(BasicRoutingService ConnectBridgedSessionsOperation.cpp)
+asterisk_scf_component_add_file(BasicRoutingService SessionListener.h)
+asterisk_scf_component_add_file(BasicRoutingService SessionListener.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
@@ -34,8 +42,11 @@ include_directories(${logger_dir}/client/src)
include_directories(${utils_dir}/SmartProxy/src)
+include_directories(${util_cpp_dir}/WorkQueue/include)
+
asterisk_scf_component_build_icebox(BasicRoutingService)
target_link_libraries(BasicRoutingService ${LUA_LIBRARIES})
target_link_libraries(BasicRoutingService logging-client)
+target_link_libraries(BasicRoutingService WorkQueue)
#asterisk_scf_component_install(BasicRoutingService RUNTIME bin "Basic Routing Service" Core)
diff --git a/src/ConnectBridgedSessionsOperation.cpp b/src/ConnectBridgedSessionsOperation.cpp
new file mode 100644
index 0000000..86bdd22
--- /dev/null
+++ b/src/ConnectBridgedSessionsOperation.cpp
@@ -0,0 +1,153 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "ConnectBridgedSessionsOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This class represents an operation to replace one session in a Bridge with sessions
+ * from another bridge. No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+
+ConnectBridgedSessionsOperation::ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>(cb,
+ context,
+ listener,
+ boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mBridgedSession(bridgedSession),
+ mIceCurrent(current)
+{
+}
+
+ConnectBridgedSessionsOperation::~ConnectBridgedSessionsOperation()
+{
+ lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+}
+
+/**
+ * This is a state handler for one of this operation's state machine.
+ * Replace one session in a Bridge with sessions from another bridge.
+ */
+void ConnectBridgedSessionsOperation::connectBridgedSessionsState()
+{
+ lg(Debug) << "connectBridgedSessions() entered... ";
+
+ // Get the bridge being merged into.
+ BridgePrx mergeBridge;
+ try
+ {
+ mergeBridge = getBridge(mSessionToReplace);
+ }
+ catch (const Ice::Exception &e)
+ {
+ finishWithException(e);
+ return;
+ }
+
+ SessionSeq preserveSessions;
+ try
+ {
+ preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
+ }
+ catch (const Ice::Exception &e)
+ {
+ finishWithException(e);
+ return;
+ }
+
+ // Create a listener for the sessions not being replaced to handle early termination.
+ lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+ mListenerManager = listener;
+
+ // Get the bridge for the sessions being moved.
+ BridgePrx oldBridge;
+ try
+ {
+ oldBridge = getBridge(mBridgedSession);
+ }
+ catch (const Ice::Exception &e)
+ {
+ finishWithException(e);
+ return;
+ }
+
+ SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
+
+ // Check for early termination by the source.
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+ finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
+ mListenerManager->getListener()->unregister();
+
+ // Now replace the sessions.
+ try
+ {
+ lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+ mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+ finishWithException(e);
+ return;
+ }
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+}
+
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsOperation.h b/src/ConnectBridgedSessionsOperation.h
new file mode 100644
index 0000000..17908b3
--- /dev/null
+++ b/src/ConnectBridgedSessionsOperation.h
@@ -0,0 +1,68 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This class represents an operation to replace one session in a Bridge with sessions
+ * from another bridge. No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+class ConnectBridgedSessionsOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr>
+{
+public:
+ ConnectBridgedSessionsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener);
+
+ virtual ~ConnectBridgedSessionsOperation();
+
+private:
+ /**
+ * This is a state handler for one of this operation's states.
+ * Replace one session in a Bridge with sessions from another bridge.
+ */
+ void connectBridgedSessionsState();
+
+private:
+ // Operation input params.
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mBridgedSession;
+ ::Ice::Current mIceCurrent;
+
+}; // class ConnectBridgedSessionsOperation
+
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.cpp b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
new file mode 100644
index 0000000..f2ea73e
--- /dev/null
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.cpp
@@ -0,0 +1,188 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the
+ * destination param. This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+ConnectBridgedSessionsWithDestinationOperation::ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb,
+ context,
+ listener,
+ boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mDestination(destination),
+ mIceCurrent(current)
+{
+}
+
+ConnectBridgedSessionsWithDestinationOperation::~ConnectBridgedSessionsWithDestinationOperation()
+{
+ lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
+}
+
+/**
+ * This is a state handler for one of this operation's states.
+ */
+void ConnectBridgedSessionsWithDestinationOperation::lookupState()
+{
+ lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+ try
+ {
+ mBridge = mSessionToReplace->getBridge();
+ }
+ catch (const Ice::Exception &e)
+ {
+ finishWithException(e);
+ return;
+ }
+
+ try
+ {
+ mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+ }
+ catch (const Ice::Exception &e)
+ {
+ finishWithException(e);
+ return;
+ }
+
+ // Create a listener for the sessions not being replaced to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+ mListenerManager = listener;
+
+ // Route the destination
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
+
+ // Set the state to exectute after lookup.
+ setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+
+ // Lookup the destination. This will use AMI, and the callback should
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
+}
+
+/**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the destination endpoint has been obtained.
+ * This operation is invoked from the worker thread, having been scheduled by
+ * the callback from lookup.
+ */
+void ConnectBridgedSessionsWithDestinationOperation::establishBridgeState()
+{
+ if (mFinished)
+ {
+ return;
+ }
+
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
+
+ // Add a session
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mListenerManager);
+
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+ finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Modify the bridge
+ try
+ {
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+ mBridge->replaceSession(mSessionToReplace, newSessions);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+ finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+
+ try
+ {
+ forwardStart(newSessions);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Debug) << "routeSession(): Exception forwarding start: " << e.what();
+
+ finishWithException(e);
+ return;
+ }
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/ConnectBridgedSessionsWithDestinationOperation.h b/src/ConnectBridgedSessionsWithDestinationOperation.h
new file mode 100644
index 0000000..95655c5
--- /dev/null
+++ b/src/ConnectBridgedSessionsWithDestinationOperation.h
@@ -0,0 +1,79 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the
+ * destination param. This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+ class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+ ConnectBridgedSessionsWithDestinationOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener);
+
+ virtual ~ConnectBridgedSessionsWithDestinationOperation();
+
+private:
+ /**
+ * This is a state handler for one of this operation's states.
+ */
+ void lookupState();
+
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the destination endpoint has been obtained.
+ * This operation is invoked from the worker thread, having been scheduled by
+ * the callback from lookup.
+ */
+ void establishBridgeState();
+
+private:
+ // Operation input params.
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mSessionToReplace;
+ std::string mDestination;
+ ::Ice::Current mIceCurrent;
+
+ // Implementation state
+ AsteriskSCF::SessionCommunications::V1::BridgePrx mBridge;
+ AsteriskSCF::SessionCommunications::V1::SessionSeq mRemainingSessions;
+
+}; // class ConnectBridgedSessionsWithDestinationOperation
+
+
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6340009..542a1b0 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -66,6 +66,7 @@ public:
};
typedef map<std::string, RegisteredLocator>::iterator EndpointLocatorMapIterator;
+typedef map<std::string, RegisteredLocator>::const_iterator EndpointLocatorMapConstIterator;
typedef map<std::string, RegisteredLocator> EndpointLocatorMap;
/**
@@ -153,8 +154,8 @@ public:
/**
* Collect results of AMI lookups from multiple EndpointLocators.
*/
- void collectResult(const EndpointSeq& endpoints)
- {
+ void lookupResult(const EndpointSeq& endpoints)
+ {
boost::lock_guard<boost::mutex> guard(mLock);
if ((endpoints.size() > 0) && mCallback)
@@ -211,45 +212,6 @@ private:
};
typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
-/**
- * Callback with the results for EndpointLocator::lookup. This
- * implementation simply forwards the info on to a LookupCollector.
- *
- * @see EndpointLocator::lookup
- * @see LookupCollector
- */
-class LookupCallback : public IceUtil::Shared
-{
-public:
- LookupCallback(const LookupResultCollectorPtr& collector) :
- mCollector(collector)
- {
- }
-
- ~LookupCallback()
- {
- lg(Debug) << "LookupCallback being destroyed. ";
- }
-
- void lookupResult(const EndpointSeq& endpoints)
- {
- // delegation to thread safe object
- // no lock needed
- mCollector->collectResult(endpoints);
- mCollector = 0;
- }
-
- void fail(const Ice::Exception &e)
- {
- mCollector->fail(e);
- mCollector = 0;
- }
-
-private:
- LookupResultCollectorPtr mCollector;
-};
-typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
-
/**
* Constructor.
@@ -312,14 +274,11 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
// Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup.
for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
{
- // Create our typesafe callback
- LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
-
- // Wrap our callback for AMI
+ // Wrap our results-collecting callback for AMI
Callback_EndpointLocator_lookupPtr lookupCallback =
- newCallback_EndpointLocator_lookup(callback,
- &LookupCallback::lookupResult,
- &LookupCallback::fail);
+ newCallback_EndpointLocator_lookup(lookupResultCollector,
+ &LookupResultCollector::lookupResult,
+ &LookupResultCollector::fail);
// Start AMI invocation
lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
(*locator)->begin_lookup(destination, lookupCallback);
@@ -328,6 +287,27 @@ void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_
}
/**
+ * Non-member utiltity to do thread-safe check for existing map entry.
+ */
+bool locatorExists(const EndpointLocatorMap& map, const std::string& locatorId, boost::shared_mutex& mtx, EndpointLocatorMapConstIterator& existing)
+{
+ EndpointLocatorMapConstIterator end;
+
+ { // critical scope
+ boost::shared_lock<boost::shared_mutex> lock(mtx);
+
+ existing = map.find(locatorId);
+ end = map.end();
+ }
+
+ if (existing == end)
+ {
+ return false;
+ }
+ return true;
+}
+
+/**
* Register an EndpointLocator that can provide endpoints.
* @param id A unique identifier for the added EndpointLocator.
* @param destinationIdRangeList A set of regular expressions that define the valid endpoint ids
@@ -340,17 +320,10 @@ void EndpointRegistry::addEndpointLocator(const std::string& locatorId, const Re
{
lg(Debug) << "EndpointRegistry::addEndpointLocator() adding locator for " << locatorId << ". Proxy details: " << locator->ice_toString() << std::endl;
- EndpointLocatorMapIterator existing(0);
- EndpointLocatorMapIterator end(0);
+ EndpointLocatorMapIterator existing;
+ bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
- { // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
-
- existing = mImpl->mEndpointLocatorMap.find(locatorId);
- end = mImpl->mEndpointLocatorMap.end();
- }
-
- if (existing != end)
+ if (exists)
{
mImpl->eraseLocatorMapItem(existing);
lg(Info) << "Received request to add endpoint with id " << locatorId << " which already exists. Replacing with new proxy.";
@@ -380,17 +353,10 @@ void EndpointRegistry::removeEndpointLocator(const std::string& locatorId, const
{
lg(Debug) << "EndpointRegistry::removeEndpointLocator() removing locator " << locatorId;
- EndpointLocatorMapIterator existing(0);
- EndpointLocatorMapIterator end(0);
-
- { // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ EndpointLocatorMapIterator existing;
+ bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
- existing = mImpl->mEndpointLocatorMap.find(locatorId);
- end = mImpl->mEndpointLocatorMap.end();
- }
-
- if (existing == end)
+ if (exists)
{
lg(Warning) << "Received request to remove Endpoint Locator not currently registered. Id = " << locatorId;
mImpl->mEventPublisher->removeEndpointLocatorEvent(locatorId, Event::FAILURE);
@@ -421,16 +387,10 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
{
try
{
- EndpointLocatorMapIterator existing(0);
- EndpointLocatorMapIterator end(0);
-
- { // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
- EndpointLocatorMapIterator existing = mImpl->mEndpointLocatorMap.find(locatorId);
- end = mImpl->mEndpointLocatorMap.end();
- }
+ EndpointLocatorMapIterator existing;
+ bool exists = locatorExists(mImpl->mEndpointLocatorMap, locatorId, mImpl->mLock, existing);
- if (existing == end)
+ if (!exists)
{
mImpl->mEventPublisher->setEndpointLocatorDestinationIdsEvent(locatorId, regExList, Event::FAILURE);
throw DestinationNotFoundException(locatorId);
diff --git a/src/RouteSessionOperation.cpp b/src/RouteSessionOperation.cpp
new file mode 100644
index 0000000..b3c8faf
--- /dev/null
+++ b/src/RouteSessionOperation.cpp
@@ -0,0 +1,184 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include "logger.h"
+
+#include "RouteSessionOperation.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::Core::Routing::V1;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation.
+ *
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread.
+ */
+RouteSessionOperation::RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>(cb,
+ context,
+ listener,
+ boost::bind(&RouteSessionOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSource(source),
+ mDestination(destination),
+ mIceCurrent(current)
+{
+}
+
+RouteSessionOperation::~RouteSessionOperation()
+{
+ lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
+}
+
+/**
+ * This is a state handler for one of this operation's states.
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+void RouteSessionOperation::lookupState()
+{
+ lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+ if (!mSessionContext.bridgeManager.initializeOnce())
+ {
+ lg(Error) << "No proxy to BridgeManager. "
+ "Make sure all services are running.";
+
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ // Create a listener for the source to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+ mListenerManager = listener;
+
+ // Set the state handler to exectute once we've looked up our endpoints.
+ setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+ // Lookup the destination. This will use AMI, and the callback will
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
+}
+
+/**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the destination endpoint has been obtained. This state
+ * completes the operation by creating the bridge.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+void RouteSessionOperation::establishBridgeState()
+{
+ if (mFinished)
+ {
+ return;
+ }
+
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
+
+ // Add a session to the endpoints.
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination, mListenerManager);
+
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the
+ // Bridge's functionality if we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Create the bridge
+ BridgePrx bridge;
+ try
+ {
+ SessionSeq bridgedSessions;
+ bridgedSessions.push_back(mSource);
+
+ bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+ bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+ lg(Debug) << "routeSession(): Creating bridge.";
+ bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ // Forward the start to all the destinations routed to.
+ lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+
+ try
+ {
+ forwardStart(newSessions);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Debug) << "routeSession(): Exception forwarding start: " << e.what();
+
+ finishWithException(e);
+ return;
+ }
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RouteSessionOperation.h b/src/RouteSessionOperation.h
new file mode 100644
index 0000000..2100853
--- /dev/null
+++ b/src/RouteSessionOperation.h
@@ -0,0 +1,78 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "RoutingIf.h"
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+#include "SessionRouterOperation.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+ /**
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation.
+ *
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread.
+ */
+class RouteSessionOperation : public SessionRouterOperation<AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr>
+{
+public:
+ RouteSessionOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener);
+
+ virtual ~RouteSessionOperation();
+
+private:
+ /**
+ * This is a state handler for one of this operation's states.
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void lookupState();
+
+ /**
+ * This is a state handler for one of this operation's states.
+ * Entering this state, the destination endpoint has been obtained. This state
+ * completes the operation by creating the bridge.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void establishBridgeState();
+
+private:
+ // Operation input params.
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+ AsteriskSCF::SessionCommunications::V1::SessionPrx mSource;
+ std::string mDestination;
+ ::Ice::Current mIceCurrent;
+
+}; // class RouteSessionOperation
+
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 9bceb87..99a132a 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -99,7 +99,7 @@ public:
}
catch (const Ice::NoEndpointException&)
{
- assert(0); // All operations callable via icestorm must support oneway.
+ assert(false); // All operations callable via icestorm must support oneway.
}
mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
@@ -112,6 +112,8 @@ public:
*/
bool isInitialized()
{
+ boost::lock_guard<boost::mutex> lock(mLock);
+
if (mInitialized)
{
return true;
@@ -177,12 +179,9 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isInitialized())
+ {
+ return;
}
try
@@ -201,12 +200,9 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isInitialized())
+ {
+ return;
}
try
@@ -226,12 +222,9 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isInitialized())
+ {
+ return;
}
try
@@ -249,12 +242,9 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isInitialized())
+ {
+ return;
}
try
@@ -272,12 +262,9 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
- { // scope for the lock
- boost::lock_guard<boost::mutex> lock(mImpl->mLock);
- if (!mImpl->isInitialized())
- {
- return;
- }
+ if (!mImpl->isInitialized())
+ {
+ return;
}
try
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
new file mode 100644
index 0000000..b39af59
--- /dev/null
+++ b/src/SessionListener.cpp
@@ -0,0 +1,255 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011 Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "logger.h"
+
+#include "SessionRouter.h"
+#include "SessionListener.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::System::Logging;
+
+namespace
+{
+Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
+}
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+SessionListenerImpl::SessionListenerImpl() :
+ mTerminated(false), mListenerPrx(0)
+{
+}
+
+SessionListenerImpl::~SessionListenerImpl()
+{
+}
+
+
+void SessionListenerImpl::stopped(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode, const Ice::Current&)
+{
+ mTerminated = true;
+
+ SessionSeq cacheSessions;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ cacheSessions = mSessions;
+ }
+
+ // Forward the stop message to all sessions other than the one that originally sent it.
+ for(SessionSeq::iterator s = cacheSessions.begin(); s != cacheSessions.end(); ++s)
+ {
+ try
+ {
+ if (session->ice_getIdentity() != (*s)->ice_getIdentity())
+ {
+ (*s)->stop(responseCode);
+ }
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
+ }
+ }
+
+}
+
+void SessionListenerImpl::addSession(const SessionPrx& session)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(session);
+}
+
+/**
+ * Add a session to be tracked by this listener, and attach this listener to the session.
+ */
+void SessionListenerImpl::addSessionAndListen(SessionPrx session)
+{
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(session);
+ }
+
+ if (mListenerPrx != 0)
+ {
+ try
+ {
+ lg(Debug) << "Adding listener to session." ;
+ session->addListener(mListenerPrx);
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
+ }
+ }
+}
+
+const int SessionListenerImpl::getNumSessions()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions.size();
+}
+
+bool SessionListenerImpl::isTerminated() // Lots of shoring up to do for asynchronous operations!
+{
+ return mTerminated;
+}
+
+/**
+ * Stop listening to all sessions we're monitoring.
+ */
+void SessionListenerImpl::unregister()
+{
+ SessionSeq sessionsToCall;
+ { // critical scope
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ sessionsToCall = mSessions;
+ }
+
+ lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
+
+ for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
+ {
+ try
+ {
+ lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
+ (*s)->removeListener(mListenerPrx);
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
+ }
+ }
+
+ // Since we're through listening to them, we should just drop our references to them.
+ { // critical scope
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.clear();
+ }
+}
+
+SessionListenerPrx SessionListenerImpl::getProxy()
+{
+ return mListenerPrx;
+}
+
+void SessionListenerImpl::setProxy(const SessionListenerPrx& prx)
+{
+ mListenerPrx = prx;
+}
+
+/**
+ * Constructor for SessionListenerManager.
+ */
+SessionListenerManager::SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+ : mSessionListener(new SessionListenerImpl()),
+ mAdapter(adapter)
+{
+ Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
+
+ try
+ {
+ SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+ mSessionListener->setProxy(listenerProxy);
+
+ mSessionListener->addSessionAndListen(session);
+ }
+ catch(...)
+ {
+ try
+ {
+ mAdapter->remove(prx->ice_getIdentity());
+ }
+ catch(...)
+ {
+ }
+ }
+}
+
+/**
+ * Constructor for SessionListenerManager.
+ */
+SessionListenerManager::SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionSeq& sessionSequence)
+ : mSessionListener(new SessionListenerImpl()),
+ mAdapter(adapter)
+{
+ Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
+
+ try
+ {
+ SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
+ mSessionListener->setProxy(listenerProxy);
+
+ for(SessionSeq::const_iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
+ {
+ mSessionListener->addSessionAndListen(*s);
+ }
+ }
+ catch(...)
+ {
+ try
+ {
+ mAdapter->remove(prx->ice_getIdentity());
+ }
+ catch(...)
+ {
+ }
+ }
+}
+
+SessionListenerManager::~SessionListenerManager()
+{
+ // Our private SessionListener implementation adds itself as a servant. It
+ // can't really undo that without getting itself deleted. So undo it
+ // in proper order.
+ try
+ {
+ lg(Debug) << "About to unregister the listener..." ;
+
+ mSessionListener->unregister();
+ }
+ catch(const std::exception& e)
+ {
+ lg(Debug) << "Exception unregistering: " << e.what() ;
+ }
+
+ try
+ {
+ // Only the adapter holds a smart pointer for this servant, so this will
+ // cause it to be deleted.
+ lg(Debug) << "Removing listener from object adapter." ;
+ mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
+ }
+ catch(const std::exception& e)
+ {
+ lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
+ }
+}
+
+SessionListenerImpl* SessionListenerManager::getListener() const
+{
+ return mSessionListener;
+}
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/SessionListener.h b/src/SessionListener.h
new file mode 100644
index 0000000..0466b1b
--- /dev/null
+++ b/src/SessionListener.h
@@ -0,0 +1,117 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010-2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/thread/shared_mutex.hpp>
+
+#include <Ice/Ice.h>
+
+#include "SessionCommunications/SessionCommunicationsIf.h"
+
+namespace AsteriskSCF
+{
+namespace BasicRoutingService
+{
+
+/**
+ * Listener used to monitor sessions during the routing process. Primarily used to
+ * insure the relevant sessions haven't stopped prior to being bridged.
+ */
+class SessionListenerImpl : public AsteriskSCF::SessionCommunications::V1::SessionListener
+{
+public:
+ SessionListenerImpl();
+ ~SessionListenerImpl();
+
+public: // Overrides for SessionListener
+
+ void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const Ice::Current&) {} // No-op
+
+ void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const Ice::Current&) {} // No-op
+
+ void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const Ice::Current&) {} // No-op
+
+ void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&,
+ const Ice::Current&) {} // No-op
+
+ void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const Ice::Current&) {} // No op
+
+ void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const Ice::Current&) {} // No op
+
+ void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+ const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode,
+ const Ice::Current&);
+
+public: // Impl operations
+
+ /**
+ * Adds a session to be tracked by the listener.
+ */
+ void addSession(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+ /**
+ * Add a session to be tracked by this listener, and attach this listener to the session.
+ */
+ void addSessionAndListen(AsteriskSCF::SessionCommunications::V1::SessionPrx session);
+ const int getNumSessions();
+ bool isTerminated() ;
+
+ /**
+ * Stop listening to all sessions we're monitoring.
+ */
+ void unregister();
+ AsteriskSCF::SessionCommunications::V1::SessionListenerPrx getProxy();
+ void setProxy(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& prx);
+
+private:
+ boost::shared_mutex mLock;
+
+ AsteriskSCF::SessionCommunications::V1::SessionSeq mSessions;
+ bool mTerminated;
+ AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListenerPrx;
+
+}; // class SessionListenerImpl
+
+typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
+
+/**
+ * This class manages the lifecycle of a session listener.
+ */
+class SessionListenerManager
+{
+public:
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessionSequence);
+
+ ~SessionListenerManager();
+ SessionListenerImpl* getListener() const;
+
+private:
+ SessionListenerImpl* mSessionListener;
+ Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
+
+} // end BasicRoutingService
+} // end AsteriskSCF
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index fcf6eef..5bba4a5 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -19,12 +19,18 @@
#include <boost/function.hpp>
#include <boost/bind.hpp>
+#include "AsteriskSCF/WorkQueue/WorkQueue.h"
+
#include "SessionRouter.h"
+#include "RouteSessionOperation.h"
+#include "ConnectBridgedSessionsOperation.h"
+#include "ConnectBridgedSessionsWithDestinationOperation.h"
+
+#include "SessionListener.h"
#include "EndpointRegistry.h"
#include "RoutingIf.h"
#include "EndpointIf.h"
#include "logger.h"
-#include "WorkQueue.h"
using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
@@ -40,1148 +46,11 @@ namespace
Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.BasicRoutingService");
}
-/**
- * TBD... This code should be refactored for AMD and use AMI on outgoing calls.
- */
namespace AsteriskSCF
{
namespace BasicRoutingService
{
-/**
- * A simple utility to make the retry process a little cleaner until a SmartProxy
- * or other such mechanism has this functionality built-in.
- */
-class RetryPolicy
-{
-public:
- /**
- * Constructor:
- * @param maxRetries Maximum number of times to retry.
- * @intervalInMilliseconds Will sleep this amount in the retry() method.
- */
- RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
- mMaxRetries(maxRetries),
- mRetryIntervalMilliseconds(intervalInMilliseconds),
- mCounter(0)
- {
- assert(maxRetries < 0xffff);
- }
-
- /**
- * Indicates whether additional retries are warrented.
- */
- bool canRetry()
- {
- return mCounter <= mMaxRetries;
- }
-
- /**
- * User must call this after each failed attempt. Applies the delay between calls and does
- * bookkeeping.
- */
- bool retry()
- {
- ++mCounter;
- lg(Debug) << "Retrying for the " << mCounter << " time.";
-
- bool doRetry = canRetry();
-
- if (doRetry)
- {
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryIntervalMilliseconds));
- }
-
- return doRetry;
- }
-
- /**
- * Accessor for the number of retries allowed.
- */
- size_t maxRetries()
- {
- return mMaxRetries;
- }
-
-private:
- size_t mMaxRetries;
- size_t mRetryIntervalMilliseconds;
- size_t mCounter;
-
-}; // class RetryPolicy
-
-/**
- * Listener used to monitor sessions during the routing process. Primarily used to
- * insure the relevant sessions haven't stopped prior to being bridged.
- */
-class SessionListenerImpl : public SessionListener
-{
-public:
- SessionListenerImpl() :
- mTerminated(false), mListenerPrx(0)
- {
- }
-
- ~SessionListenerImpl()
- {
- }
-
-public: // The following operations are implementations of the SessionListener interface.
-
- void connected(const SessionPrx& session, const Ice::Current&)
- {
- }
-
- void flashed(const SessionPrx& session, const Ice::Current&)
- {
- }
-
- void held(const SessionPrx& session, const Ice::Current&)
- {
- }
-
- void progressing(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&)
- {
- }
-
- void ringing(const SessionPrx& session, const Ice::Current&)
- {
- }
-
- void stopped(const SessionPrx& session, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& responseCode, const Ice::Current&)
- {
- mTerminated = true;
-
- SessionSeq cacheSessions;
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- cacheSessions = mSessions;
- }
-
- // Forward the stop message to all sessions other than the one that originally sent it.
- for(SessionSeq::iterator s = cacheSessions.begin(); s != cacheSessions.end(); ++s)
- {
- try
- {
- if (session->ice_getIdentity() != (*s)->ice_getIdentity())
- {
- (*s)->stop(responseCode);
- }
- }
- catch(const Ice::Exception &e)
- {
- lg(Error) << "Session Listener unable to forward stop to session " << (*s)->ice_toString() << " due to " << e.what();
- }
- }
-
- }
-
- void unheld(const SessionPrx& session, const Ice::Current&)
- {
- }
-
-public:
-
- /**
- * Adds a session to be tracked by the listener. This operation doesn't actually call addListener on the Session.
- * When we ask an endpoint to create a session, we pass our listener in to the creation process.
- * So the listener has been attached to the session, but we want to keep track of it in this object.
- */
- void addSession(SessionPrx session)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(session);
- }
-
- /**
- * Add a session to be tracked by this listener, and attach this listener to the session.
- */
- void addSessionAndListen(SessionPrx session)
- {
- { // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(session);
- }
-
- if (mListenerPrx != 0)
- {
- try
- {
- lg(Debug) << "Adding listener to session." ;
- session->addListener(mListenerPrx);
- }
- catch(const Ice::Exception &e)
- {
- lg(Error) << "Exception adding listener to session " << session->ice_toString() << ". Details: " << e.what();
- }
- }
- }
-
- const int getNumSessions()
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- return mSessions.size();
- }
-
- bool isTerminated() // Lots of shoring up to do for asynchronous operations!
- {
- return mTerminated;
- }
-
- /**
- * Stop listening to all sessions we're monitoring.
- */
- void unregister()
- {
- SessionSeq sessionsToCall;
- { // critical scope
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- sessionsToCall = mSessions;
- }
-
- lg(Debug) << "unregister() called with " << sessionsToCall.size() << " sessions to remove listener from.";
-
- for(SessionSeq::iterator s=sessionsToCall.begin(); s != sessionsToCall.end(); ++s)
- {
- try
- {
- lg(Debug) << "Removing listener from session " << (*s)->ice_toString();
- (*s)->removeListener(mListenerPrx);
- }
- catch(const Ice::Exception &e)
- {
- lg(Error) << "Exception removing listener from session " << (*s)->ice_toString() << ". Details: " << e.what();
- }
- }
-
- // Since we're through listening to them, we should just drop our references to them.
- { // critical scope
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.clear();
- }
- }
-
- SessionListenerPrx getProxy()
- {
- return mListenerPrx;
- }
-
- void setProxy(const SessionListenerPrx& prx)
- {
- mListenerPrx = prx;
- }
-
-private:
- boost::shared_mutex mLock;
-
- SessionSeq mSessions;
- bool mTerminated;
- SessionListenerPrx mListenerPrx;
-
-}; // class SessionListenerImpl
-
-typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
-
-/**
- * This class manages the lifecycle of a session listener.
- */
-class SessionListenerManager
-{
-public:
- SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
- : mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter)
- {
- Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-
- try
- {
- SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
- mSessionListener->setProxy(listenerProxy);
-
- mSessionListener->addSessionAndListen(session);
- }
- catch(...)
- {
- try
- {
- mAdapter->remove(prx->ice_getIdentity());
- }
- catch(...)
- {
- }
- }
- }
-
- SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
- : mSessionListener(new SessionListenerImpl()),
- mAdapter(adapter)
- {
- Ice::ObjectPrx prx = adapter->addWithUUID(mSessionListener);
-
- try
- {
- SessionListenerPrx listenerProxy = SessionListenerPrx::checkedCast(prx);
- mSessionListener->setProxy(listenerProxy);
-
- for(SessionSeq::iterator s = sessionSequence.begin(); s != sessionSequence.end(); ++s)
- {
- mSessionListener->addSessionAndListen(*s);
- }
- }
- catch(...)
- {
- try
- {
- mAdapter->remove(prx->ice_getIdentity());
- }
- catch(...)
- {
- }
- }
- }
-
- ~SessionListenerManager()
- {
- // Our private SessionListener implementation adds itself as a servant. It
- // can't really undo that without getting itself deleted. So undo it
- // in proper order.
- try
- {
- lg(Debug) << "About to unregister the listener..." ;
-
- mSessionListener->unregister();
- }
- catch(const std::exception& e)
- {
- lg(Debug) << "Exception unregistering: " << e.what() ;
- }
-
- try
- {
- // Only the adapter holds a smart pointer for this servant, so this will
- // cause it to be deleted.
- lg(Debug) << "Removing listener from object adapter." ;
- mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
- }
- catch(const std::exception& e)
- {
- lg(Debug) << "Exception removing listener from Object Adatper " << e.what() ;
- }
- }
-
- SessionListenerImpl* getListener() const
- {
- return mSessionListener;
- }
-
-private:
- SessionListenerImpl *mSessionListener;
- Ice::ObjectAdapterPtr mAdapter;
-
-}; // class SessionListenerManager
-
-typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
-
-/**
- * Context required by all of the SessionRouter operations.
- * All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required).
- */
-struct SessionContext
-{
-public:
- /**
- * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
- * due to component initialization sequence.
- */
- SessionContext(const Ice::ObjectAdapterPtr& adapter,
- const EndpointRegistryPtr& registry,
- const RoutingEventsPtr& publisher,
- const boost::shared_ptr<WorkQueue>& workQueue)
- : adapter(adapter),
- endpointRegistry(registry),
- eventPublisher(publisher),
- workQueue(workQueue)
- {
- }
-
- Ice::ObjectAdapterPtr adapter;
- EndpointRegistryPtr endpointRegistry;
- RoutingEventsPtr eventPublisher;
- boost::shared_ptr<WorkQueue> workQueue;
- AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
-};
-
-/**
- * An interface for an object that manages a collection of SessionRouterOperations.
- */
-class OperationsManager
-{
-public:
- /**
- * Operations indicate they are finished by calling this method.
- */
- virtual void finished(WorkQueue::Work *) = 0;
-
- /**
- * Operations can reschedule themselves on the WorkQueue.
- */
- virtual void reschedule(WorkQueue::Work *) = 0;
-
-protected:
- OperationsManager() {} // Can't construct directly. This class is an interface.
-};
-
-// Forward-declaration
-template <typename T> class LookupCallback;
-
-/**
- * This is a base class for worker objects that offload SessionRouter operations
- * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
- * interface so that it can be enqueued to a worker thread or thread pool.
- * The template parameter T is the type of the AMD callback type for the
- * particular operation.
- */
-template<typename T>
-class SessionRouterOperation : public WorkQueue::Work
-{
-public:
- /**
- * Constructor.
- * @param amdCallback The callback object to provide results to the initiator of this operation.
- * @param context The SessionContext provides references to key objects needed by each operation.
- * @param manager
- */
- SessionRouterOperation(const T& amdCallback,
- const SessionContext& context,
- OperationsManager* manager,
- const boost::function<void ()> &initialStateHandler)
- : mInitiatorCallback(amdCallback),
... 1792 lines suppressed ...
--
team/ken.hunt/route_async_routing.git
More information about the asterisk-scf-commits
mailing list