[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Dec 21 03:32:14 UTC 2010
branch "route_async" has been created
at d74afeba80c8b862e7021b8378bf38f92565d317 (commit)
- Log -----------------------------------------------------------------
commit d74afeba80c8b862e7021b8378bf38f92565d317
Author: Ken Hunt <ken.hunt at digium.com>
Date: Mon Dec 20 21:26:08 2010 -0600
Supporting asynchronous calls in routing service.
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 3920cc2..f246a22 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,6 +30,7 @@
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
+#include "SimpleWorkQueue.h"
#include "IceLogger.h"
#include "logger.h"
@@ -56,11 +57,14 @@ namespace BasicRoutingService
class BasicRoutingServiceApp : public IceBox::Service
{
public:
- BasicRoutingServiceApp() :
- mDone(false), mInitialized(false), mRunning(false)
+ BasicRoutingServiceApp()
+ : mDone(false),
+ mInitialized(false),
+ mRunning(false),
+ mWorkQueue( new SimpleWorkQueue("SessionRouterWorkQueue", lg))
{
-
}
+
~BasicRoutingServiceApp()
{
// Smart pointers do your thing.
@@ -88,6 +92,7 @@ private:
bool mDone;
bool mInitialized;
bool mRunning;
+ boost::shared_ptr<SimpleWorkQueue> mWorkQueue;
std::string mAppName;
ServiceLocatorManagementPrx mServiceLocatorManagement;
@@ -261,7 +266,7 @@ void BasicRoutingServiceApp::initialize()
mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
// Create publish the SessionRouter interface.
- SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher));
+ SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
mSessionRouter = basicSessionPtr;
mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04c5eda..1f6833a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,6 +19,9 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
+asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6eecc72..79b96b6 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -22,6 +22,7 @@
#include "ScriptProcessor.h"
#include "logger.h"
+using namespace ::AsteriskSCF::Core::Endpoint::V1;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
using namespace ::AsteriskSCF::Core::Routing::V1::Event;
@@ -117,6 +118,130 @@ public:
};
/**
+ * A collector for lookup() operation AMI replies.
+ */
+class LookupResultCollector : public IceUtil::Shared
+{
+public:
+ /**
+ * Constructor.
+ * @param cb Ice callback.
+ * @param numVotes The number of times isSupported will be called.
+ */
+ LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback,
+ const std::string& destination,
+ const RoutingEventsPtr& eventPublisher,
+ int numVotes)
+ : mCallback(callback),
+ mNumVotes(numVotes),
+ mEventPublisher(eventPublisher),
+ mDestination(destination)
+ {
+ assert(mNumVotes >= 0);
+
+ if (mNumVotes == 0)
+ {
+ notifyFailed();
+ }
+ }
+
+ /**
+ * Collect results of AMI lookups from multiple EndpointLocators.
+ */
+ void collectResult(const EndpointSeq& endpoints)
+ {
+ boost::lock_guard<boost::mutex> guard(mLock);
+
+ if ((endpoints.size() > 0) && mCallback)
+ {
+ mCallback->ice_response(endpoints);
+
+ // clear the mCallback pointer so we only answer once
+ mCallback = 0;
+
+ lg(Debug) << "EndpointRegistry::lookup() found Endpoint for destination " << mDestination;
+
+ // Post event
+ mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
+ }
+
+ assert(mNumVotes > 0); // isSupported was called too many times
+
+ if (--mNumVotes == 0 && mCallback)
+ {
+ notifyFailed();
+ }
+ }
+
+ void fail(const Ice::Exception &e)
+ {
+ boost::lock_guard<boost::mutex> guard(mLock);
+
+ if (--mNumVotes == 0 && mCallback)
+ {
+ notifyFailed();
+ }
+ }
+
+ void notifyFailed()
+ {
+ DestinationNotFoundException e;
+ mCallback->ice_exception(e);
+
+ // clear the mCallback pointer so we only answer once
+ mCallback = 0;
+
+ // Post event
+ mEventPublisher->lookupEvent(mDestination, Event::FAILURE);
+
+ lg(Debug) << "EndpointRegistry::lookup() failed to find destination " << mDestination;
+ }
+
+private:
+ boost::mutex mLock;
+ AMD_EndpointLocator_lookupPtr mCallback;
+ int mNumVotes;
+ RoutingEventsPtr mEventPublisher;
+ std::string mDestination;
+};
+typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
+
+/**
+ * Callback with the results for EndpointLocator::lookup. This
+ * implementation simply forwards the info on to a LookupCollector.
+ *
+ * @see EndpointLocator::lookup
+ * @see LookupCollector
+ */
+class LookupCallback : public IceUtil::Shared
+{
+public:
+ LookupCallback(const LookupResultCollectorPtr& collector) :
+ mCollector(collector)
+ {
+ }
+
+ void lookupResult(const EndpointSeq& endpoints)
+ {
+ // delegation to thread safe object
+ // no lock needed
+ mCollector->collectResult(endpoints);
+ mCollector = 0;
+ }
+
+ void fail(const Ice::Exception &e)
+ {
+ mCollector->fail(e);
+ mCollector = 0;
+ }
+
+private:
+ LookupResultCollectorPtr mCollector;
+};
+typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
+
+
+/**
* Constructor.
*/
EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
@@ -124,6 +249,74 @@ EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const Routi
{
}
+
+/**
+ * Returns the endpoints that match the specified destination id.
+ * @param id String identifier of the the destination.
+ */
+void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& amdcallback,
+ const ::std::string& destination,
+ const ::Ice::Current&)
+{
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+
+ lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
+
+ string modifiedDestination(destination);
+ if (mImpl->mScriptProcessor.get() != 0)
+ {
+ if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
+ {
+ mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
+
+ lg(Error) << "lookup(): denied by confirmLookup() script.";
+ amdcallback->ice_response(endpoints);
+ return;
+ }
+ }
+
+ std::vector<EndpointLocatorPrx> locatorsToTry;
+
+ // Iterate over all registered EndpointLocators and check their regular expressions against the destination.
+ EndpointLocatorMap locatorMap;
+ mImpl->getEndpointLocatorMapCopy(locatorMap);
+ for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
+ {
+ // Test to see if the destination matches any of this entry's regular expressions.
+ for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
+ {
+ if (boost::regex_match(modifiedDestination, *reg))
+ {
+ lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << " at " << entry->first;
+ locatorsToTry.push_back(entry->second.locator);
+ }
+ }
+ }
+
+ // Create a single results collector for the AMI callbacks to reference.
+ LookupResultCollectorPtr lookupResultCollector(new LookupResultCollector(amdcallback,
+ destination,
+ mImpl->mEventPublisher,
+ locatorsToTry.size()));
+
+ // Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup.
+ for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
+ {
+ // Create our typesafe callback
+ LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
+
+ // Wrap our callback for AMI
+ Callback_EndpointLocator_lookupPtr lookupCallback =
+ newCallback_EndpointLocator_lookup(callback,
+ &LookupCallback::lookupResult,
+ &LookupCallback::fail);
+ // Start AMI invocation
+ lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
+ (*locator)->begin_lookup(destination, lookupCallback);
+ }
+
+}
+
/**
* Register an EndpointLocator that can provide endpoints.
* @param id A unique identifier for the added EndpointLocator.
@@ -248,63 +441,6 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
}
/**
- * Returns the endpoints that match the specified destination id.
- * @param id String identifier of the the destination.
- */
-AsteriskSCF::Core::Endpoint::V1::EndpointSeq EndpointRegistry::lookup(const std::string& destination, const Ice::Current&)
-{
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
-
- lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
-
- string modifiedDestination(destination);
- if (mImpl->mScriptProcessor.get() != 0)
- {
- if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
- {
- mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
-
- lg(Error) << "lookup(): denied by confirmLookup() script.";
- return endpoints;
- }
- }
-
- EndpointLocatorMap locatorMap;
- mImpl->getEndpointLocatorMapCopy(locatorMap);
-
- for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
- {
- // Test to see if the destination matches any of this entry's regular expressions.
- for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
- {
- if (boost::regex_match(modifiedDestination, *reg))
- {
- lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << ". Calling remote lookup()";
-
- try
- {
- endpoints = entry->second.locator->lookup(modifiedDestination);
- }
- catch (const IceUtil::Exception& e)
- {
- lg(Error) << "Exception calling registered EndpointLocator for " << entry->first << " Details: " << e.what();
- }
- break;
- }
- }
- }
-
- Event::OperationResult result(Event::FAILURE);
- if (endpoints.size() > 0)
- {
- result = Event::SUCCESS;
- }
- mImpl->mEventPublisher->lookupEvent(destination, result);
-
- return endpoints;
-}
-
-/**
* Configure this object with a ScriptProcessor.
*/
void EndpointRegistry::setScriptProcessor(ScriptProcessor* scriptProcessor)
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index d85a015..775338e 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -73,7 +73,7 @@ public:
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
*/
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq lookup(const std::string& destination, const Ice::Current&);
+ virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
public:
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 8aae895..73cbc18 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -16,6 +16,8 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
+#include <boost/thread/mutex.hpp>
+
#include "RoutingServiceEventPublisher.h"
#include "logger.h"
@@ -117,6 +119,7 @@ public:
public:
Event::RoutingEventsPrx mEventTopic;
+ boost::mutex mLock;
private:
Ice::ObjectAdapterPtr mAdapter;
@@ -137,6 +140,8 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
@@ -160,6 +165,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
@@ -181,6 +188,8 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
@@ -203,6 +212,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
@@ -223,6 +234,8 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
@@ -243,6 +256,8 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+
if (!mImpl->isInitialized())
{
return;
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index 26b54ad..b160069 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -13,15 +13,20 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
+#include <boost/shared_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
#include "SessionRouter.h"
#include "EndpointRegistry.h"
#include "RoutingIf.h"
#include "EndpointIf.h"
#include "logger.h"
+#include "WorkQueue.h"
+using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::Core::Endpoint::V1;
using namespace AsteriskSCF::System::Logging;
@@ -101,7 +106,8 @@ private:
size_t mMaxRetries;
size_t mRetryIntervalMilliseconds;
size_t mCounter;
-};
+
+}; // class RetryPolicy
/**
* Listener used to monitor sessions during the routing process. Primarily used to
@@ -270,7 +276,9 @@ private:
SessionSeq mSessions;
bool mTerminated;
SessionListenerPrx mListenerPrx;
-};
+
+}; // class SessionListenerImpl
+
typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
/**
@@ -353,7 +361,7 @@ public:
try
{
// Only the adapter holds a smart pointer for this servant, so this will
- // cause it to be delted.
+ // cause it to be deleted.
lg(Debug) << "Removing listener from object adapter." ;
mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
}
@@ -363,7 +371,12 @@ public:
}
}
- SessionListenerImpl* operator->()
+ SessionListenerImpl* operator->() const
+ {
+ return mSessionListener;
+ }
+
+ SessionListenerImpl* getListener() const
{
return mSessionListener;
}
@@ -371,63 +384,138 @@ public:
private:
SessionListenerImpl *mSessionListener;
Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerAllocator
+
+typedef boost::shared_ptr<SessionListenerAllocator> SessionListenerAllocatorPtr;
+/**
+ * Context required by all of the SessionRouter operations.
+ * All of the items in the SessionContext are thread-safe. (i.e. no lock required).
+ */
+struct SessionContext
+{
+public:
+ /**
+ * Constructor. The BridgeManager isn't initialized, but configured via a setter.
+ */
+ SessionContext(const Ice::ObjectAdapterPtr& adapter,
+ const EndpointRegistryPtr& registry,
+ const RoutingEventsPtr& publisher,
+ const boost::shared_ptr<WorkQueue> &mWorkQueue)
+ : adapter(adapter),
+ endpointRegistry(registry),
+ eventPublisher(publisher),
+ workQueue(workQueue)
+ {
+ }
+
+ Ice::ObjectAdapterPtr adapter;
+ EndpointRegistryPtr endpointRegistry;
+ RoutingEventsPtr eventPublisher;
+ AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
+ boost::shared_ptr<WorkQueue> workQueue;
+};
+
+/**
+ * An interface for an object that manages a collection of SessionRouterOperations.
+ */
+class OperationsManager
+{
+public:
+ virtual void finished(WorkQueue::Work *) = 0;
+ virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation) = 0;
+
+protected:
+ OperationsManager() {}
};
/**
- * Private operations and state of the SessionRouter.
+ * This is a base class for worker objects that offload SessionRouter operations
+ * to a worker thead. It implements the WorkQueue::Work
+ * interface so that it can be enqueued to a worker thread or thread pool.
*/
-class SessionRouterPriv
+template<typename T>
+class SessionRouterOperation : public WorkQueue::Work
{
public:
- SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
- mAdapter(objectAdapter),
- mEndpointRegistry(endpointRegistry),
- mEventPublisher(eventPublisher)
+ SessionRouterOperation(const T& amdCallback,
+ const SessionContext& context,
+ OperationsManager* manager,
+ const boost::function<void ()> &initialStateHandler)
+ : mInitiatorCallback(amdCallback),
+ mSessionContext(context),
+ mFinished(false),
+ mOperationsManager(manager),
+ mCurrentStateHandler(initialStateHandler)
{
}
- ~SessionRouterPriv()
+ virtual ~SessionRouterOperation()
{
}
- /**
- * Set the accessor to the bridge.
- */
- void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+ virtual void doWork()
{
- mBridgeManager = bridgeAccessor;
+ mCurrentStateHandler();
+ }
+
+ void finishWithException(const ::std::exception& e)
+ {
+ // Forward to this operation's initiator.
+ mInitiatorCallback->ice_exception(e);
+ finish();
+ }
+
+ void finishWithException()
+ {
+ mInitiatorCallback->ice_exception();
+ finish();
+ }
+
+ void finishAndSendResult()
+ {
+ mInitiatorCallback->ice_response();
+ finish();
+ }
+
+ void setLookupResult(const EndpointSeq& endpoints)
+ {
+ mLookupResult = endpoints;
+
+ // Reschedule the operation to complete. The operation will have
+ // advanced to the correct state handler.
+ mSessionContext.workQueue->enqueue(mOperationsManager->getOngoingOperationSharedPointer(this));
+ }
+
+protected: // These protected operations are utiltity functions.
+
+ void finish()
+ {
+ mFinished = true;
+ mOperationsManager->finished(this);
}
/**
- * Do a lookup of the requested endpoint.
+ * Initiate a lookup of the requested endpoint.
*/
- EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
+ void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
{
- EndpointSeq endpoints;
try
{
- // Lookup the destination.
- endpoints = mEndpointRegistry->lookup(destination, current);
+ // This component's own lookup interface is implemented as AMD.
+ // We provide our override of AMD callback.
+ AMD_EndpointLocator_lookupPtr lookupCallback;
- if (endpoints.empty())
- {
- throw DestinationNotFoundException(destination);
- }
- }
- catch (const DestinationNotFoundException&)
- {
- // rethrow
- throw;
+ boost::shared_ptr<WorkQueue::Work> workPtr = mOperationsManager->getOngoingOperationSharedPointer(this);
+ lookupCallback = new LookupCallback<T>(this);
+
+ // Lookup the destination.
+ mSessionContext.endpointRegistry->lookup_async(lookupCallback, destination, current);
}
- catch (const Ice::Exception &)
+ catch (...)
{
- // Probably couldn't access the EndpointLocator of the registered channel.
- throw EndpointUnreachableException(destination);
+ finishWithException();
}
-
- return endpoints;
}
@@ -445,8 +533,7 @@ public:
catch (const Ice::Exception &e)
{
lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
- // TBD... probably other bridge cleanup needs to be done.
- throw;
+ finishWithException(e);
}
}
}
@@ -456,7 +543,7 @@ public:
*/
BridgePrx getBridge(SessionPrx session)
{
- BridgePrx result(0);
+ BridgePrx result;
RetryPolicy policy(5, 500);
while(policy.canRetry())
@@ -466,23 +553,26 @@ public:
result = session->getBridge();
break;
}
- catch(const Ice::ConnectionLostException&)
+ catch(const Ice::ConnectionLostException& cle)
{
if(!policy.retry())
{
lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed " << policy.maxRetries() << " retries." ;
- throw;
+ finishWithException(cle);
+ throw cle;
}
}
catch(const NotBridged& e)
{
lg(Error) << "getBridge(): session is not bridged." ;
- throw e; // rethrow
+ finishWithException(e);
+ throw e;
}
catch(const Ice::Exception& e)
{
lg(Error) << "getBridge(): Ice exception getting bridge for session:" << e.what();
- throw e; // rethrow
+ finishWithException(e);
+ throw e;
}
}
@@ -493,7 +583,7 @@ public:
* Create a session to each of a given set of endpoints, and return a collection of the
* newly added sessions.
*/
- SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+ SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination)
{
// Add a session
SessionSeq newSessions;
@@ -505,16 +595,16 @@ public:
// Create a session on the destination.
lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
- SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+ SessionPrx destSession = sessionEndpoint->createSession(destination, mListenerManager->getListener()->getProxy());
lg(Debug) << " Session proxy: " << destSession->ice_toString() ;
- listener->addSession(destSession);
+ mListenerManager->getListener()->addSession(destSession);
newSessions.push_back(destSession);
}
catch(const Ice::Exception &exception)
{
lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
- // We may be able to reach SOME of the endpoints.
+ // We may be able to reach SOME of the endpoints.
}
}
return newSessions;
@@ -523,19 +613,33 @@ public:
/**
* Accessor for the sessions in a bridge.
* @bridge The bridge whose sessions are to be accessed.
- * @except An optional session proxy to be excluded from the list of sessions.
*/
- SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
+ SessionSeq getSessionsInBridge(const BridgePrx& bridge)
{
- SessionSeq sessions;
try
{
SessionSeq allSessions = bridge->listSessions();
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
+ finishWithException(e);
- if (except == 0)
- {
- return allSessions;
- }
+ }
+ return allSessions;
+ }
+
+ /**
+ * Accessor for the sessions in a bridge.
+ * @bridge The bridge whose sessions are to be accessed.
+ * @except Session proxy to be excluded from the list of sessions.
+ */
+ SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except)
+ {
+ SessionSeq sessions;
+ try
+ {
+ SessionSeq allSessions = bridge->listSessions();
for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
{
@@ -548,7 +652,7 @@ public:
catch(const Ice::Exception &e)
{
lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
- throw e; // rethrow
+ finishWithException(e);
}
return sessions;
}
@@ -583,165 +687,552 @@ public:
return removedSessions;
}
-public:
- Ice::ObjectAdapterPtr mAdapter;
- EndpointRegistryPtr mEndpointRegistry;
- RoutingEventsPtr mEventPublisher;
- AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx> mBridgeManager;
-};
+protected:
+ T mInitiatorCallback;
+ SessionContext mSessionContext;
+ WorkQueue::PoolId mPoolId;
-SessionRouter::SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
- mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher))
-{
-}
+ bool mFinished;
+ EndpointSeq mLookupResult;
+ SessionListenerAllocatorPtr mListenerManager;
+ OperationsManager* mOperationsManager;
+ boost::function<void ()> mCurrentStateHandler; // Lightweight state machine. Current state handles doWork() for a given state.
-SessionRouter::~SessionRouter()
-{
- mImpl.reset();
-}
-
-void SessionRouter::setBridgeManager(
- const AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
-{
- mImpl->mBridgeManager = bridgeAccessor;
-}
+}; // class SessionRouterOperation
/**
- * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
- * TBD - Need to rework with asynch support.
+ * This is a specialization of the SessionRouterOperation to handle the
+ * routeSession() operation. This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread.
*/
-void SessionRouter::routeSession(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const std::string& destination,
- const Ice::Current& current)
+class RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
{
- lg(Debug) << "routeSession() entered with destination " << destination ;
+public:
+ RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation(cb,
+ context,
+ listener,
+ boost::bind(&RouteSessionOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSource(source),
+ mDestination(destination),
+ mIceCurrent(current)
+ {
+ }
- if (!mImpl->mBridgeManager.initializeOnce())
+ virtual ~RouteSessionOperation()
{
- lg(Error) << "No proxy to BridgeManager. "
- "Make sure all services are running.";
- throw BridgingException(source->getEndpoint()->getId(), destination);
+ lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
}
- // Create a listener for the source to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- SessionListenerAllocator listener(mImpl->mAdapter, source);
+private:
+ /**
+ * We start routing the session by looking up endpoint of the destination.
+ * This method represents processing in our initial state.
+ * It is executed off the worker thread.
+ */
+ void lookupState()
+ {
+ lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+ if (!mSessionContext.bridgeManager.initializeOnce())
+ {
+ lg(Error) << "No proxy to BridgeManager. "
+ "Make sure all services are running.";
+
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
- // Route the destination
- lg(Debug) << "routeSession(): Routing destination " << destination;
- EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+ // Create a listener for the source to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSource));
+ mListenerManager = listener;
- // Add a session to the endpoints.
- SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+ // Set the state to exectute once we've looked up our endpoints.
+ mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
- if (listener->getNumSessions() < 2)
+ // Lookup the destination. This will use AMI, and the callback should
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
+ }
+
+ /**
+ * Entering this state, the destination endpoint has been obtained. This state
+ * completes the operation by creating the bridge.
+ *
+ * This operation is the final state, and is executed off the worker thread.
+ */
+ void establishBridgeState()
+ {
+ if (mFinished)
+ {
+ return;
+ }
+
+ assert(mEndpoints.size() > 0);
+
+ // Add a session to the endpoints.
+ SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
+
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the
+ // Bridge's functionality if we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Create the bridge
+ BridgePrx bridge;
+ try
+ {
+ SessionSeq bridgedSessions;
+ bridgedSessions.push_back(mSource);
+
+ bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+ bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+ lg(Debug) << "routeSession(): Creating bridge.";
+ bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ // Forward the start to all the destinations routed to.
+ lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+ forwardStart(newSessions);
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+ }
+
+private:
+ // Operation input params.
+ AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+ SessionPrx mSource;
+ string mDestination;
+ ::Ice::Current mIceCurrent;
+
+ // Implementation state
+ EndpointSeq mEndpoints;
+
+}; // class RouteSessionOperation
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable
+ * by the destination param.
+ *
+ * This is a specialization of the SessionRouterOperation that handles the
+ * connectBridgedSessionsWithDestination() operation. This object is an instance
+ * of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+ class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+ ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation(cb,
+ context,
+ listener,
+ boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mDestination(destination),
+ mIceCurrent(current)
{
- throw SessionCreationException(destination);
}
- if (listener->isTerminated())
+ virtual ~ConnectBridgedSessionsWithDestinationOperation()
{
- throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
+ lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
}
- // We're through listening, and we will probably interfere with the
- // Bridge's functionality if we keep listening.
- listener->unregister();
+private:
+
+ void lookupState()
+ {
+ lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+ mBridge = mSessionToReplace->getBridge();
+
+ mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+
+ // Create a listener for the sessions not being replaced to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+ SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSessionToReplace));
+ mListenerManager = listener;
- // Create the bridge
- BridgePrx bridge;
- try
+ // Route the destination
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
+
+ // Set the state to exectute after lookup.
+ mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+
+ // Lookup the destination. This will use AMI, and the callback should
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
+ }
+
+ /**
+ * Entering this state, the destination endpoint has been obtained.
+ * This operation is invoked from the worker thread, having been scheduled by
+ * the callback from lookup.
+ */
+ void establishBridgeState()
{
- SessionSeq bridgedSessions;
- bridgedSessions.push_back(source);
+ if (mFinished)
+ {
+ return;
+ }
- bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
- bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+ // Add a session
+ SessionSeq newSessions = createSessionForEndpoints(mEndpoints, mDestination);
- lg(Debug) << "routeSession(): Creating bridge.";
- bridge = mImpl->mBridgeManager->createBridge(bridgedSessions, 0);
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+ finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Modify the bridge
+ try
+ {
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+ mBridge->replaceSession(mSessionToReplace, newSessions);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+ finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+ forwardStart(newSessions);
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+ }
+
+private:
+ // Operation input params.
+ AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+ SessionPrx mSessionToReplace;
+ string mDestination;
+ ::Ice::Current mIceCurrent;
+
+ // Implementation state
+ EndpointSeq mEndpoints;
+ BridgePrx mBridge;
+ SessionSeq mRemainingSessions;
+
+}; // class ConnectBridgedSessionsWithDestinationOperation
+
+/**
+ * This is a specialization of the SessionRouterOperation that handles the
+ * connectBridgedSessions() operation.
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency,
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+class ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
+{
+public:
+ ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation(cb, context, listener, boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mBridgedSession(bridgedSession),
+ mIceCurrent(current)
+ {
}
- catch (const Ice::Exception &e)
+
+ virtual ~ConnectBridgedSessionsOperation()
{
- lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+ lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+ }
+
+private:
+ /**
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency,
+ */
+ void connectBridgedSessionsState()
+ {
+ lg(Debug) << "connectBridgedSessions() entered... ";
+
+ // Get the bridge being merged into.
+ BridgePrx mergeBridge = getBridge(mSessionToReplace);
+
+ SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
+
+ // Create a listener for the sessions not being replaced to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+ SessionListenerAllocator listener(mSessionContext.adapter, preserveSessions);
+
+ // Get the bridge for the sessions being moved.
+ BridgePrx oldBridge = getBridge(mBridgedSession);
+
+ SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
+
+ // Check for early termination by the source.
+ if (listener->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+ finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
listener->unregister();
- throw BridgingException(source->getEndpoint()->getId(), destination);
+ // Now replace the sessions.
+ try
+ {
+ lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+ mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+ finishWithException(e); // rethrow
+ return;
+ }
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
}
+private:
+ // Operation input params.
+ AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+ SessionPrx mSessionToReplace;
+ SessionPrx mBridgedSession;
+ ::Ice::Current mIceCurrent;
- // Forward the start to all the destinations routed to.
- lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
- mImpl->forwardStart(newSessions);
+}; // class ConnectBridgedSessionsOperation
-} // SessionRouter::routeSession(...)
-/**
- * Replace one session in a Bridge with a new
- * session routable by the destination param.
- * @param source The session initiating the routing event.
- * @param destination The address or id of the destination to be routed.
+/**
+ * An implementation of the AMD_EndpointLocator_lookup callback so
+ * that we can call our own lookup operation.
+ * Note that we're not really using AMD, but we're using the same
+ * AMD implementation that other components would use to do a lookup().
*/
-void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sessionToReplace,
- const ::std::string& destination,
- const Ice::Current& current)
+template <typename T>
+class LookupCallback : public AMD_EndpointLocator_lookup
{
- lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
+public:
+ LookupCallback(SessionRouterOperation<T>* operation)
+ : mOperation(operation)
+ {
+ }
+
+public: // Overrides.
+
+ virtual void ice_exception(const ::std::exception& e)
+ {
+ mOperation->finishWithException(e);
+ }
+
+ virtual void ice_exception()
+ {
+ mOperation->finishWithException();
+ }
- BridgePrx bridge(sessionToReplace->getBridge());
+ virtual void ice_response(const ::AsteriskSCF::Core::Endpoint::V1::EndpointSeq& endpoints)
+ {
+ mOperation->setLookupResult(endpoints);
+ }
- SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
+private:
+ SessionRouterOperation<T>* mOperation;
+};
- // Create a listener for the sessions not being replaced to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
- SessionListenerAllocator listener(mImpl->mAdapter, remainingSessions);
+typedef map<WorkQueue::Work*, boost::shared_ptr<WorkQueue::Work> > OperationMap;
- // Route the destination
- lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
- EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+/**
+ * Private operations and state of the SessionRouter.
+ */
+class SessionRouterPriv : public OperationsManager
+{
+public:
+ SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter,
+ const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& workQueue) :
+ mSessionContext(objectAdapter,
+ endpointRegistry,
+ eventPublisher,
+ workQueue)
+ {
+ }
- // Add a session
- SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+ ~SessionRouterPriv()
+ {
+ }
- if (listener->getNumSessions() < 2)
+ /**
+ * Set the accessor to the bridge.
+ */
+ void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
+ SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
{
- lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << destination << " in connectBridgedSessionsWithDestination(). " ;
- throw SessionCreationException(destination);
+ mSessionContext.bridgeManager = bridgeAccessor;
}
- if (listener->isTerminated())
+ void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
{
- lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
- throw SourceTerminatedPreBridgingException(remainingSessions[0]->getEndpoint()->getId());
+ mOngoingOperations[work.get()] = work;
+ mSessionContext.workQueue->enqueue(work);
}
- // We're through listening, and we will probably interfere with the Bridge's functionality if
- // we keep listening.
- listener->unregister();
- // Modify the bridge
- try
+public: // Overrides
+
+ /**
+ * Handle a notice from an operation that it has completed.
+ * Remove our shared_ptr reference so that it will die.
+ */
+ virtual void finished(WorkQueue::Work* op)
{
- lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << destination;
- bridge->replaceSession(sessionToReplace, newSessions);
+ boost::lock_guard<boost::mutex> guard(mLock);
+ OperationMap::iterator kvp = mOngoingOperations.find(op);
+
+ if (kvp != mOngoingOperations.end())
+ {
+ lg(Debug) << "Removing reference to finished opeation.";
+ mOngoingOperations.erase(kvp);
+ }
}
- catch (const Ice::Exception &e)
+
+ /**
+ * The operations sometimes need a shared_ptr to themselves to hand off to callbacks or
+ * other objects being processed in other threads.
+ */
+ virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
{
- lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
- throw BridgingException(remainingSessions[0]->getEndpoint()->getId(), destination);
+ boost::lock_guard<boost::mutex> guard(mLock);
+ OperationMap::iterator kvp = mOngoingOperations.find(operation);
+
+ assert(kvp != mOngoingOperations.end());
+ return (*kvp).second;
}
- lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
- mImpl->forwardStart(newSessions);
+public:
+ SessionContext mSessionContext;
+ OperationMap mOngoingOperations;
+ boost::mutex mLock;
+};
-} // SessionRouter::connectBridgedSessionsWithDestination(...)
+SessionRouter::SessionRouter(
+ const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& workQueue)
+ : mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher, workQueue))
+{
+}
+SessionRouter::~SessionRouter()
+{
+ mImpl.reset();
+}
+
+void SessionRouter::setBridgeManager(
+ const AsteriskSCF::SmartProxy::SmartProxy<
+ SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+{
+ mImpl->mSessionContext.bridgeManager = bridgeAccessor;
+}
+
+/**
+ * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
+ */
+void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current)
+{
+ WorkPtr routeSessionOp(new RouteSessionOperation(cb,
+ source,
+ destination,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
+
+
+ mImpl->scheduleOperation(routeSessionOp);
+}
+
+/**
+ * Replace one session in a Bridge with a new
+ * session routable by the destination param.
+ * @param source The session initiating the routing event.
+ * @param destination The address or id of the destination to be routed.
+ */
+void SessionRouter::connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current)
+{
+ WorkPtr connectBridgedSessionsWithDestinationOp(new ConnectBridgedSessionsWithDestinationOperation(cb,
+ sessionToReplace,
+ destination,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
+
+ mImpl->scheduleOperation(connectBridgedSessionsWithDestinationOp);
+
+}
/**
* Replace one session in a Bridge with sessions from another bridge.
@@ -756,52 +1247,23 @@ void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sess
* their current bridge before being added to the bridge currenltly attached to
* sessionToReplace.
*/
-void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
- const SessionPrx& bridgedSession,
- const Ice::Current&)
-{
- lg(Debug) << "connectBridgedSessions() entered... ";
-
- // Get the bridge being merged into.
- BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
-
- SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
-
- // Create a listener for the sessions not being replaced to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
- SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
- // Get the bridge for the sessions being moved.
- BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
+void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current)
+{
+ WorkPtr connectBridgedSessionsOp(new ConnectBridgedSessionsOperation(cb,
+ sessionToReplace,
+ bridgedSession,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
- SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
+ mImpl->scheduleOperation(connectBridgedSessionsOp);
- // Check for early termination by the source.
- if (listener->isTerminated())
- {
- lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
- throw SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId());
- }
- // We're through listening, and we will probably interfere with the Bridge's functionality if
- // we keep listening.
- lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
- listener->unregister();
-
- // Now replace the sessions.
- try
- {
- lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
- mergeBridge->replaceSession(sessionToReplace, migratingSessions);
- }
- catch(const Ice::Exception& e)
- {
- lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
- throw e; // rethrow
- }
+}
-} // SessionRouter::connectBridgedSessions(...)
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index d955519..014f3ce 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -21,6 +21,7 @@
#include "SmartProxy.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
#include "EndpointRegistry.h"
+#include "WorkQueue.h"
namespace AsteriskSCF
{
@@ -34,8 +35,10 @@ class SessionRouterPriv;
class SessionRouter : public AsteriskSCF::SessionCommunications::V1::SessionRouter
{
public:
- SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher);
+ SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,
+ const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& sessionRouterWorkQueue);
~SessionRouter();
void setBridgeManager(
@@ -55,6 +58,11 @@ public:
const std::string& destination,
const Ice::Current&);
+ virtual void routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current&);
+
/**
* Replace a session in a bridge with a destination. The desintation will be routed.
* @param sessionToReplace The session to be replaced in a bridge. The affected Bridge interface is
@@ -65,6 +73,11 @@ public:
const ::std::string& destination,
const Ice::Current&);
+ virtual void connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& );
+
/**
* Replace a session in a bridge with another session. If the newSession is already participating in a Bridge,
* it will be removed from it's current bridge prior to be used as a replacement.
@@ -76,6 +89,11 @@ public:
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
const Ice::Current&);
+ virtual void connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current&);
+
private:
boost::shared_ptr<SessionRouterPriv> mImpl;
};
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
new file mode 100644
index 0000000..f55ff82
--- /dev/null
+++ b/src/SimpleWorkQueue.cpp
@@ -0,0 +1,273 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+/**
+ * A simple Work Queue implementation. On construction, starts an internal thread.
+ * Work can be enqueued via the thread-safe enqueue() method. All work must implement
+ * the Work interface.
+ */
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+#include <list>
+
+#include "logger.h"
+
+#include "SimpleWorkQueue.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::System::Logging;
+using namespace boost;
+
+namespace AsteriskSCF
+{
+class SimpleWorkQueuePriv
+{
+public:
+ SimpleWorkQueuePriv(const std::string& id, const Logger& logger)
+ : mLogger(logger),
+ mQid(id),
+ mInitialized(false),
+ mPaused(false), // runs by default.
+ mFinished(false),
+ mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
+
+ {
+ mLogger(Debug) << "SimpleWorkQueue::private_impl constructor called. Queue ID:" << mQid;
+ }
+
+ ~SimpleWorkQueuePriv()
+ {
+ mLogger(Debug) << "SimpleWorkQueue::private_impl desctuctor called. Queue ID:" << mQid;
+ }
+
+ WorkPtr dequeue();
+ WorkPtr waitAndDequeue();
+ void execute();
+ bool isPaused();
+
+ const Logger& mLogger;
+ std::string mQid;
+ bool mInitialized;
+ bool mFinished;
+ bool mPaused;
+ std::list<WorkPtr> mQueue;
+ boost::thread mThread;
+ boost::mutex mQueueMutex;
+ boost::condition mEmptyQueueCondition;
+ boost::mutex mPauseMutex;
+ boost::condition mPauseCondition;
+};
+}
+
+SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
+{
+ mImpl->mLogger(Debug) << "SimpleWorkQueue::Constructor() called. Queue ID:" << mImpl->mQid;
+ mImpl->mInitialized = true;
+}
+
+SimpleWorkQueue::~SimpleWorkQueue()
+{
+ mImpl->mLogger(Debug) << "SimpleWorkQueue::Destructor() called. Queue ID:" << mImpl->mQid;
+ terminate();
+
+ // Wait for worker thread to shut down.
+ mImpl->mThread.join(); // If you don't do this, then the mImpl is trashed and Execute has bad "this" ptr on other thread.
+}
+
+bool SimpleWorkQueue::isRunning()
+{
+ return (mImpl->mInitialized && !mImpl->mPaused && !mImpl->mFinished);
+}
+
+/**
+ * Pause the SimpleWorkQueue's thread.
+ */
+void SimpleWorkQueue::pause()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
+
+ boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+ mImpl->mPaused = true;
+}
+
+/**
+ * Resume from a Paused state.
+ */
+void SimpleWorkQueue::resume()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
+
+ boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+ mImpl->mPaused = false;
+ mImpl->mPauseCondition.notify_all();
+}
+
+/**
+ * Stops this thread from executing.
+ */
+void SimpleWorkQueue::terminate()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Terminate called for queue " << mImpl->mQid ;
+
+ mImpl->mFinished = true;
+ mImpl->mPaused = false;
+ mImpl->mPauseCondition.notify_all(); // In case the thread was waiting on the PauseCondition.
+ mImpl->mEmptyQueueCondition.notify_all(); // In case the thread was waiting on an EmptyQueueCondition
+}
+
+/**
+ * A convenience method to determine if there is any pending work on the queue.
+ */
+bool SimpleWorkQueue::workPending()
+{
+ return !mImpl->mQueue.empty();
+}
+
+/**
+ * Allows other thread to join to this thread. The caller needs to
+ * call this object's Terminate method, or the join will block
+ * indefinitely.
+ */
+void SimpleWorkQueue::join()
+{
+ mImpl->mThread.join();
+}
+
+static WorkQueue::PoolId mNoOpPoolId;
+
+/**
+ * Enqueue an item of work for processing on this queue's thread.
+ */
+WorkQueue::PoolId SimpleWorkQueue::enqueue(WorkPtr w)
+{
+ boost::mutex::scoped_lock lock(mImpl->mQueueMutex);
+ bool wasEmpty = mImpl->mQueue.empty();
+ mImpl->mQueue.push_back(w);
+ int size = mImpl->mQueue.size();
+ lock.unlock();
+
+ if (wasEmpty)
+ {
+ mImpl->mEmptyQueueCondition.notify_all();
+ }
+
+ return mNoOpPoolId;
+}
+
+/**
+ * This is a private no-op implementation of a work item. Returned from WaitAndDequeue
+ * if the program is Terminated while waiting on the EmptyQueueCondition.
+ */
+class NO_WORK_CLASS : public WorkQueue::Work
+{
+public:
+ NO_WORK_CLASS() {};
+ void doWork() {} // Do nothing
+};
+static shared_ptr<WorkQueue::Work> NO_WORK_PTR(new NO_WORK_CLASS());
+
+/**
+ * This method returns the next work from the queue. If no work available,
+ * this method waits on the EmptyQueueCondition.
+ */
+WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
+{
+ boost::mutex::scoped_lock lock(mQueueMutex);
+
+ int size = mQueue.size(); // debugging
+
+ while (mQueue.empty())
+ {
+ mLogger(Debug) << "SimpleWorkQueue::WaitAndDequeue: Waiting on empty queue. Queue ID:" << mQid;
+
+ if (mFinished)
+ {
+ mLogger(Info) << "SimpleWorkQueue::WaitAndDequeue: Returning the NO_WORK token. Queue ID:" << mQid;
+ return NO_WORK_PTR;
+ }
+
+ mEmptyQueueCondition.wait(lock);
+ }
+
+ mLogger(Debug) << "SimpleWorkQueue::WaitAndDequeue: Dequeuing some work. Queue ID:" << mQid;
+
+ size = mQueue.size(); // debugging
+
+ shared_ptr<WorkQueue::Work> work = mQueue.front();
+ mQueue.pop_front();
+
+ return work;
+}
+
+bool SimpleWorkQueuePriv::isPaused()
+{
+ boost::mutex::scoped_lock lock(mPauseMutex);
+ return mPaused;
+}
+
+/**
+ * This is the thread's event loop. The thread terminates when this method returns.
+ */
+void SimpleWorkQueuePriv::execute()
+{
+ while (!mInitialized)
+ {
+ mLogger(Debug) << "SimpleWorkQueue::Execute: Waiting for initialization. Queue ID:" << mQid;
+
+ // The thread can start before the constructor has finished initializing the object.
+ // Can lead to strange behavior.
+ continue;
+ }
+
+ while (!mFinished)
+ {
+ { // scope the lock
+ boost::mutex::scoped_lock lock(mPauseMutex);
+ while(mPaused)
+ {
+ mLogger(Debug) << "SimpleWorkQueue::Execute: Waiting while paused. Queue ID:" << mQid;
+
+ mPauseCondition.wait(lock);
+ }
+
+ if (mFinished) // In case Terminate was called while in PauseCondition
+ {
+ break;
+ }
+ } // end lock scope
+
+ mLogger(Debug) << "SimpleWorkQueue::Execute: Pinging the work queue. Queue ID:" << mQid;
+
+ shared_ptr<WorkQueue::Work> work = waitAndDequeue();
+
+ mLogger(Debug) << "SimpleWorkQueue::Execute: Doing the work. Queue ID:" << mQid;
+
+ try
+ {
+ work->doWork();
+ }
+ catch(...)
+ {
+ // Workers should be catching/managing their own exceptions!
+ }
+
+ } // while !mFinished
+
+ mLogger(Debug) << "SimpleWorkQueue:Execute: Exiting the thread for good. Queue ID:" << mQid;
+}
+
diff --git a/src/SimpleWorkQueue.h b/src/SimpleWorkQueue.h
new file mode 100644
index 0000000..164b1a7
--- /dev/null
+++ b/src/SimpleWorkQueue.h
@@ -0,0 +1,58 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <list>
+#include <string>
+#include <boost/thread.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include "WorkQueue.h"
+#include "logger.h"
+
+namespace AsteriskSCF
+{
+/**
+ * A simple Work Queue implementation. On construction, starts an internal thread.
+ * Work can be enqueued via the thread-safe enqueue() method. All work must implement
+ * the IWork interface of IWorkQueue.
+ * See IWorkQueue for more information.
+ */
+class SimpleWorkQueuePriv;
+
+class SimpleWorkQueue : public WorkQueue, boost::noncopyable
+{
+public:
+
+ SimpleWorkQueue(const std::string& id, const AsteriskSCF::System::Logging::Logger& logger);
+ ~SimpleWorkQueue();
+
+ virtual WorkQueue::PoolId enqueue(WorkPtr w);
+ virtual void terminate();
+ virtual void join();
+
+ // This implementation adds the concept of Pausing to the generic IWorkQueue.
+ bool isRunning();
+ bool workPending();
+ void pause();
+ void resume();
+
+private:
+ boost::shared_ptr<SimpleWorkQueuePriv> mImpl;
+};
+
+};
diff --git a/src/WorkQueue.h b/src/WorkQueue.h
new file mode 100644
index 0000000..c3a7a9a
--- /dev/null
+++ b/src/WorkQueue.h
@@ -0,0 +1,85 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include "boost/shared_ptr.hpp"
+
+namespace AsteriskSCF
+{
+/**
+ * This class defines an interface to a work queue. A work queue manages one or
+ * more processing threads, and allows work to be enqueued.
+ */
+class WorkQueue
+{
+public:
+
+ /**
+ * This is the interface for work that gets enqueued.
+ */
+ class Work
+ {
+ public:
+ virtual void doWork() = 0;
+ };
+ typedef boost::shared_ptr<Work> WorkPtr;
+
+ /**
+ * Value used as a token to identify a particualr thread.
+ * For implementations where this interface is
+ * in front of a Thread Pool implementation, this token can
+ * be used to direct future work for related items onto the same
+ * worker thread.
+ */
+ class PoolId
+ {
+ };
+
+ /**
+ * Enqueue work to be peformed.
+ */
+ virtual PoolId enqueue(WorkPtr w) = 0;
+
... 25033 lines suppressed ...
--
team/ken.hunt/route_async_routing.git
More information about the asterisk-scf-commits
mailing list