[asterisk-scf-commits] asterisk-scf/integration/routing.git branch "route_async" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Dec 23 01:34:28 UTC 2010
branch "route_async" has been created
at 749bfb7746b6be3337c6eb750c129b35993c4297 (commit)
- Log -----------------------------------------------------------------
commit 749bfb7746b6be3337c6eb750c129b35993c4297
Merge: 3f8b609 a52053b
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Dec 22 19:17:52 2010 -0600
Merge branch 'route_async' of gitdepot:team/ken.hunt/route_async_routing into route_async
Conflicts:
src/SessionRouter.cpp
diff --cc src/SessionRouter.cpp
index 2e28b94,b35bc34..fcf6eef
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@@ -963,7 -963,7 +963,6 @@@ private
// Set the state to exectute after lookup.
setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
-- // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
// Lookup the destination. This will use AMI, and the callback should
// schedule us to execute again.
commit 3f8b609fc14db8635d6d5c917b4b28b89f0e5c0e
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Dec 22 19:11:23 2010 -0600
Routing async support.
diff --git a/src/BasicRoutingServiceApp.cpp b/src/BasicRoutingServiceApp.cpp
index 3920cc2..f246a22 100644
--- a/src/BasicRoutingServiceApp.cpp
+++ b/src/BasicRoutingServiceApp.cpp
@@ -30,6 +30,7 @@
#include "EndpointRegistry.h"
#include "RoutingAdmin.h"
#include "SessionRouter.h"
+#include "SimpleWorkQueue.h"
#include "IceLogger.h"
#include "logger.h"
@@ -56,11 +57,14 @@ namespace BasicRoutingService
class BasicRoutingServiceApp : public IceBox::Service
{
public:
- BasicRoutingServiceApp() :
- mDone(false), mInitialized(false), mRunning(false)
+ BasicRoutingServiceApp()
+ : mDone(false),
+ mInitialized(false),
+ mRunning(false),
+ mWorkQueue( new SimpleWorkQueue("SessionRouterWorkQueue", lg))
{
-
}
+
~BasicRoutingServiceApp()
{
// Smart pointers do your thing.
@@ -88,6 +92,7 @@ private:
bool mDone;
bool mInitialized;
bool mRunning;
+ boost::shared_ptr<SimpleWorkQueue> mWorkQueue;
std::string mAppName;
ServiceLocatorManagementPrx mServiceLocatorManagement;
@@ -261,7 +266,7 @@ void BasicRoutingServiceApp::initialize()
mAdapter->add(mEndpointRegistry, mCommunicator->stringToIdentity(RegistryLocatorObjectId));
// Create publish the SessionRouter interface.
- SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher));
+ SessionRouter *rawSessionRouter(new SessionRouter(mAdapter, mEndpointRegistry, mEventPublisher, mWorkQueue));
BasicSessionRouterPtr basicSessionPtr(rawSessionRouter);
mSessionRouter = basicSessionPtr;
mAdapter->add(rawSessionRouter, mCommunicator->stringToIdentity(SessionRouterObjectId));
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04c5eda..1f6833a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -19,6 +19,9 @@ asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.cpp)
asterisk_scf_component_add_file(BasicRoutingService LuaScriptProcessor.h)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.cpp)
asterisk_scf_component_add_file(BasicRoutingService RoutingServiceEventPublisher.h)
+asterisk_scf_component_add_file(BasicRoutingService WorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.h)
+asterisk_scf_component_add_file(BasicRoutingService SimpleWorkQueue.cpp)
asterisk_scf_component_add_ice_libraries(BasicRoutingService IceStorm)
asterisk_scf_component_add_boost_libraries(BasicRoutingService thread date_time core regex)
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 6eecc72..6340009 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -22,6 +22,7 @@
#include "ScriptProcessor.h"
#include "logger.h"
+using namespace ::AsteriskSCF::Core::Endpoint::V1;
using namespace ::AsteriskSCF::Core::Routing::V1;
using namespace ::AsteriskSCF::System::Logging;
using namespace ::AsteriskSCF::Core::Routing::V1::Event;
@@ -117,6 +118,140 @@ public:
};
/**
+ * A collector for lookup() operation AMI replies.
+ */
+class LookupResultCollector : public IceUtil::Shared
+{
+public:
+ /**
+ * Constructor.
+ * @param cb Ice callback.
+ * @param numVotes The number of times isSupported will be called.
+ */
+ LookupResultCollector(const AMD_EndpointLocator_lookupPtr& callback,
+ const std::string& destination,
+ const RoutingEventsPtr& eventPublisher,
+ int numVotes)
+ : mCallback(callback),
+ mNumVotes(numVotes),
+ mEventPublisher(eventPublisher),
+ mDestination(destination)
+ {
+ assert(mNumVotes >= 0);
+
+ if (mNumVotes == 0)
+ {
+ notifyFailed();
+ }
+ }
+
+ ~LookupResultCollector()
+ {
+ lg(Debug) << "LookupResultCollector being destroyed. ";
+ }
+
+ /**
+ * Collect results of AMI lookups from multiple EndpointLocators.
+ */
+ void collectResult(const EndpointSeq& endpoints)
+ {
+ boost::lock_guard<boost::mutex> guard(mLock);
+
+ if ((endpoints.size() > 0) && mCallback)
+ {
+ mCallback->ice_response(endpoints);
+
+ // clear the mCallback pointer so we only answer once
+ mCallback = 0;
+
+ lg(Debug) << "EndpointRegistry::lookup() found Endpoint for destination " << mDestination;
+
+ // Post event
+ mEventPublisher->lookupEvent(mDestination, Event::SUCCESS);
+ }
+
+ assert(mNumVotes > 0); // isSupported was called too many times
+
+ if (--mNumVotes == 0 && mCallback)
+ {
+ notifyFailed();
+ }
+ }
+
+ void fail(const Ice::Exception &e)
+ {
+ boost::lock_guard<boost::mutex> guard(mLock);
+
+ if (--mNumVotes == 0 && mCallback)
+ {
+ notifyFailed();
+ }
+ }
+
+ void notifyFailed()
+ {
+ DestinationNotFoundException e(mDestination);
+ mCallback->ice_exception(e);
+
+ // clear the mCallback pointer so we only answer once
+ mCallback = 0;
+
+ // Post event
+ mEventPublisher->lookupEvent(mDestination, Event::FAILURE);
+
+ lg(Debug) << "EndpointRegistry::lookup() failed to find destination " << mDestination;
+ }
+
+private:
+ boost::mutex mLock;
+ AMD_EndpointLocator_lookupPtr mCallback;
+ int mNumVotes;
+ RoutingEventsPtr mEventPublisher;
+ std::string mDestination;
+};
+typedef IceUtil::Handle<LookupResultCollector> LookupResultCollectorPtr;
+
+/**
+ * Callback with the results for EndpointLocator::lookup. This
+ * implementation simply forwards the info on to a LookupCollector.
+ *
+ * @see EndpointLocator::lookup
+ * @see LookupCollector
+ */
+class LookupCallback : public IceUtil::Shared
+{
+public:
+ LookupCallback(const LookupResultCollectorPtr& collector) :
+ mCollector(collector)
+ {
+ }
+
+ ~LookupCallback()
+ {
+ lg(Debug) << "LookupCallback being destroyed. ";
+ }
+
+ void lookupResult(const EndpointSeq& endpoints)
+ {
+ // delegation to thread safe object
+ // no lock needed
+ mCollector->collectResult(endpoints);
+ mCollector = 0;
+ }
+
+ void fail(const Ice::Exception &e)
+ {
+ mCollector->fail(e);
+ mCollector = 0;
+ }
+
+private:
+ LookupResultCollectorPtr mCollector;
+};
+typedef IceUtil::Handle<LookupCallback> LookupCallbackPtr;
+
+
+/**
* Constructor.
*/
EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const RoutingEventsPtr& eventPublisher) :
@@ -124,6 +259,74 @@ EndpointRegistry::EndpointRegistry(ScriptProcessor* scriptProcessor, const Routi
{
}
+
+/**
+ * Returns the endpoints that match the specified destination id.
+ * @param id String identifier of the the destination.
+ */
+void EndpointRegistry::lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& amdcallback,
+ const ::std::string& destination,
+ const ::Ice::Current&)
+{
+ AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
+
+ lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
+
+ string modifiedDestination(destination);
+ if (mImpl->mScriptProcessor.get() != 0)
+ {
+ if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
+ {
+ mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
+
+ lg(Error) << "lookup(): denied by confirmLookup() script.";
+ amdcallback->ice_response(endpoints);
+ return;
+ }
+ }
+
+ std::vector<EndpointLocatorPrx> locatorsToTry;
+
+ // Iterate over all registered EndpointLocators and check their regular expressions against the destination.
+ EndpointLocatorMap locatorMap;
+ mImpl->getEndpointLocatorMapCopy(locatorMap);
+ for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
+ {
+ // Test to see if the destination matches any of this entry's regular expressions.
+ for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
+ {
+ if (boost::regex_match(modifiedDestination, *reg))
+ {
+ lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << " at " << entry->first;
+ locatorsToTry.push_back(entry->second.locator);
+ }
+ }
+ }
+
+ // Create a single results collector for the AMI callbacks to reference.
+ LookupResultCollectorPtr lookupResultCollector(new LookupResultCollector(amdcallback,
+ destination,
+ mImpl->mEventPublisher,
+ locatorsToTry.size()));
+
+ // Invoke an AMI lookup on each endpointLocator that might be able to satisfy this lookup.
+ for(std::vector<EndpointLocatorPrx>::iterator locator = locatorsToTry.begin(); locator != locatorsToTry.end(); ++locator)
+ {
+ // Create our typesafe callback
+ LookupCallbackPtr callback(new LookupCallback(lookupResultCollector));
+
+ // Wrap our callback for AMI
+ Callback_EndpointLocator_lookupPtr lookupCallback =
+ newCallback_EndpointLocator_lookup(callback,
+ &LookupCallback::lookupResult,
+ &LookupCallback::fail);
+ // Start AMI invocation
+ lg(Debug) << "EndpointRegistry::lookup() invoke a lookup for " << destination;
+ (*locator)->begin_lookup(destination, lookupCallback);
+ }
+
+}
+
/**
* Register an EndpointLocator that can provide endpoints.
* @param id A unique identifier for the added EndpointLocator.
@@ -248,63 +451,6 @@ void EndpointRegistry::setEndpointLocatorDestinationIds(const std::string& locat
}
/**
- * Returns the endpoints that match the specified destination id.
- * @param id String identifier of the the destination.
- */
-AsteriskSCF::Core::Endpoint::V1::EndpointSeq EndpointRegistry::lookup(const std::string& destination, const Ice::Current&)
-{
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq endpoints;
-
- lg(Debug) << "EndpointRegistry::lookup() called for destination " << destination;
-
- string modifiedDestination(destination);
- if (mImpl->mScriptProcessor.get() != 0)
- {
- if (!mImpl->mScriptProcessor->confirmLookup(destination, modifiedDestination))
- {
- mImpl->mEventPublisher->lookupEvent(destination, Event::FAILURE);
-
- lg(Error) << "lookup(): denied by confirmLookup() script.";
- return endpoints;
- }
- }
-
- EndpointLocatorMap locatorMap;
- mImpl->getEndpointLocatorMapCopy(locatorMap);
-
- for(EndpointLocatorMapIterator entry = locatorMap.begin(); entry != locatorMap.end(); ++entry)
- {
- // Test to see if the destination matches any of this entry's regular expressions.
- for(vector<boost::regex>::iterator reg = entry->second.regexList.begin(); reg != entry->second.regexList.end(); ++reg)
- {
- if (boost::regex_match(modifiedDestination, *reg))
- {
- lg(Debug) << "EndpointRegistry::lookup() found an EndpointLocator for " << destination << ". Calling remote lookup()";
-
- try
- {
- endpoints = entry->second.locator->lookup(modifiedDestination);
- }
- catch (const IceUtil::Exception& e)
- {
- lg(Error) << "Exception calling registered EndpointLocator for " << entry->first << " Details: " << e.what();
- }
- break;
- }
- }
- }
-
- Event::OperationResult result(Event::FAILURE);
- if (endpoints.size() > 0)
- {
- result = Event::SUCCESS;
- }
- mImpl->mEventPublisher->lookupEvent(destination, result);
-
- return endpoints;
-}
-
-/**
* Configure this object with a ScriptProcessor.
*/
void EndpointRegistry::setScriptProcessor(ScriptProcessor* scriptProcessor)
diff --git a/src/EndpointRegistry.h b/src/EndpointRegistry.h
index d85a015..775338e 100644
--- a/src/EndpointRegistry.h
+++ b/src/EndpointRegistry.h
@@ -73,7 +73,7 @@ public:
* Returns the endpoints that match the specified destination id.
* @param id String identifier of the the destination.
*/
- AsteriskSCF::Core::Endpoint::V1::EndpointSeq lookup(const std::string& destination, const Ice::Current&);
+ virtual void lookup_async(const ::AsteriskSCF::Core::Routing::V1::AMD_EndpointLocator_lookupPtr& cb, const ::std::string& destination, const ::Ice::Current&);
public:
diff --git a/src/RoutingServiceEventPublisher.cpp b/src/RoutingServiceEventPublisher.cpp
index 8aae895..9bceb87 100644
--- a/src/RoutingServiceEventPublisher.cpp
+++ b/src/RoutingServiceEventPublisher.cpp
@@ -16,6 +16,8 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
+#include <boost/thread/mutex.hpp>
+
#include "RoutingServiceEventPublisher.h"
#include "logger.h"
@@ -42,6 +44,7 @@ public:
RoutingServiceEventPublisherPriv(const Ice::ObjectAdapterPtr& adapter) :
mAdapter(adapter), mInitialized(false)
{
+ boost::lock_guard<boost::mutex> lock(mLock);
initialize();
}
@@ -89,7 +92,17 @@ public:
}
Ice::ObjectPrx publisher = topic->getPublisher();
- mEventTopic = Event::RoutingEventsPrx::uncheckedCast(publisher);
+ Ice::ObjectPrx oneway;
+ try
+ {
+ oneway = publisher->ice_oneway();
+ }
+ catch (const Ice::NoEndpointException&)
+ {
+ assert(0); // All operations callable via icestorm must support oneway.
+ }
+
+ mEventTopic = Event::RoutingEventsPrx::uncheckedCast(oneway);
mInitialized = true;
}
@@ -116,7 +129,8 @@ public:
}
public:
- Event::RoutingEventsPrx mEventTopic;
+ Event::RoutingEventsPrx mEventTopic; // Using one-way proxy.
+ boost::mutex mLock;
private:
Ice::ObjectAdapterPtr mAdapter;
@@ -137,9 +151,12 @@ RoutingServiceEventPublisher::RoutingServiceEventPublisher(const Ice::ObjectAdap
void RoutingServiceEventPublisher::lookupEvent(const std::string& destination,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -160,9 +177,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
const ::AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -181,9 +201,12 @@ void RoutingServiceEventPublisher::addEndpointLocatorEvent(const std::string& lo
void RoutingServiceEventPublisher::removeEndpointLocatorEvent(const std::string& locatorId,
AsteriskSCF::Core::Routing::V1::Event::OperationResult result, const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -203,9 +226,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
const AsteriskSCF::Core::Routing::V1::RegExSeq& regexList, AsteriskSCF::Core::Routing::V1::Event::OperationResult result,
const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -223,9 +249,12 @@ void RoutingServiceEventPublisher::setEndpointLocatorDestinationIdsEvent(const s
*/
void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
@@ -243,9 +272,12 @@ void RoutingServiceEventPublisher::clearEndpointLocatorsEvent(const Ice::Current
*/
void RoutingServiceEventPublisher::setPolicyEvent(const std::string& policy, const Ice::Current &)
{
- if (!mImpl->isInitialized())
- {
- return;
+ { // scope for the lock
+ boost::lock_guard<boost::mutex> lock(mImpl->mLock);
+ if (!mImpl->isInitialized())
+ {
+ return;
+ }
}
try
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index b50b711..2e28b94 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -13,20 +13,26 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
+#include <boost/shared_ptr.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
#include "SessionRouter.h"
#include "EndpointRegistry.h"
#include "RoutingIf.h"
#include "EndpointIf.h"
#include "logger.h"
+#include "WorkQueue.h"
+using namespace AsteriskSCF;
using namespace AsteriskSCF::Core::Routing::V1;
using namespace AsteriskSCF::Core::Endpoint::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Core::Routing::V1::Event;
+using namespace AsteriskSCF::Core::Routing::V1;
using namespace std;
namespace
@@ -101,7 +107,8 @@ private:
size_t mMaxRetries;
size_t mRetryIntervalMilliseconds;
size_t mCounter;
-};
+
+}; // class RetryPolicy
/**
* Listener used to monitor sessions during the routing process. Primarily used to
@@ -270,18 +277,18 @@ private:
SessionSeq mSessions;
bool mTerminated;
SessionListenerPrx mListenerPrx;
-};
+
+}; // class SessionListenerImpl
+
typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
/**
- * This class uses RAII to manage the lifecycle of a session listener.
- * It's sort of a smart pointer for the listener, but it's tightly
- * coupled to the specifics of our private impl.
+ * This class manages the lifecycle of a session listener.
*/
-class SessionListenerAllocator
+class SessionListenerManager
{
public:
- SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
: mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
@@ -306,7 +313,7 @@ public:
}
}
- SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
: mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
@@ -334,7 +341,7 @@ public:
}
}
- ~SessionListenerAllocator()
+ ~SessionListenerManager()
{
// Our private SessionListener implementation adds itself as a servant. It
// can't really undo that without getting itself deleted. So undo it
@@ -353,7 +360,7 @@ public:
try
{
// Only the adapter holds a smart pointer for this servant, so this will
- // cause it to be delted.
+ // cause it to be deleted.
lg(Debug) << "Removing listener from object adapter." ;
mAdapter->remove(mSessionListener->getProxy()->ice_getIdentity());
}
@@ -363,7 +370,7 @@ public:
}
}
- SessionListenerImpl* operator->()
+ SessionListenerImpl* getListener() const
{
return mSessionListener;
}
@@ -371,65 +378,191 @@ public:
private:
SessionListenerImpl *mSessionListener;
Ice::ObjectAdapterPtr mAdapter;
+
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
+
+/**
+ * Context required by all of the SessionRouter operations.
+ * All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required).
+ */
+struct SessionContext
+{
+public:
+ /**
+ * Constructor. The BridgeManager isn't initialized. It's configured later via a setter
+ * due to component initialization sequence.
+ */
+ SessionContext(const Ice::ObjectAdapterPtr& adapter,
+ const EndpointRegistryPtr& registry,
+ const RoutingEventsPtr& publisher,
+ const boost::shared_ptr<WorkQueue>& workQueue)
+ : adapter(adapter),
+ endpointRegistry(registry),
+ eventPublisher(publisher),
+ workQueue(workQueue)
+ {
+ }
+
+ Ice::ObjectAdapterPtr adapter;
+ EndpointRegistryPtr endpointRegistry;
+ RoutingEventsPtr eventPublisher;
+ boost::shared_ptr<WorkQueue> workQueue;
+ AsteriskSCF::SmartProxy::SmartProxy<BridgeManagerPrx> bridgeManager;
+};
+
+/**
+ * An interface for an object that manages a collection of SessionRouterOperations.
+ */
+class OperationsManager
+{
+public:
+ /**
+ * Operations indicate they are finished by calling this method.
+ */
+ virtual void finished(WorkQueue::Work *) = 0;
+
+ /**
+ * Operations can reschedule themselves on the WorkQueue.
+ */
+ virtual void reschedule(WorkQueue::Work *) = 0;
+
+protected:
+ OperationsManager() {} // Can't construct directly. This class is an interface.
};
+// Forward-declaration
+template <typename T> class LookupCallback;
+
/**
- * Private operations and state of the SessionRouter.
+ * This is a base class for worker objects that offload SessionRouter operations
+ * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
+ * interface so that it can be enqueued to a worker thread or thread pool.
+ * The template parameter T is the type of the AMD callback type for the
+ * particular operation.
*/
-class SessionRouterPriv
+template<typename T>
+class SessionRouterOperation : public WorkQueue::Work
{
public:
- SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
- mAdapter(objectAdapter),
- mEndpointRegistry(endpointRegistry),
- mEventPublisher(eventPublisher)
+ /**
+ * Constructor.
+ * @param amdCallback The callback object to provide results to the initiator of this operation.
+ * @param context The SessionContext provides references to key objects needed by each operation.
+ * @param manager
+ */
+ SessionRouterOperation(const T& amdCallback,
+ const SessionContext& context,
+ OperationsManager* manager,
+ const boost::function<void ()> &initialStateHandler)
+ : mInitiatorCallback(amdCallback),
+ mSessionContext(context),
+ mFinished(false),
+ mOperationsManager(manager),
+ mCurrentStateHandler(initialStateHandler)
{
}
- ~SessionRouterPriv()
+ virtual ~SessionRouterOperation()
+ {
+ }
+
+ /**
+ * An implementation of the WorkQueue::Work interface.
+ */
+ virtual void doWork()
{
+ mCurrentStateHandler();
}
/**
- * Set the accessor to the bridge.
+ * Inform initiator that this operation finished with
+ * the specified exception.
*/
- void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+ void finishWithException(const ::std::exception& e)
{
- mBridgeManager = bridgeAccessor;
+ // Forward to this operation's initiator.
+ mInitiatorCallback->ice_exception(e);
+ finish();
}
/**
- * Do a lookup of the requested endpoint.
+ * Inform initiator that this operation finished with
+ * an unspecified exception.
*/
- EndpointSeq lookupEndpoints(const std::string& destination, const Ice::Current& current)
+ void finishWithException()
{
- EndpointSeq endpoints;
- try
- {
- // Lookup the destination.
- endpoints = mEndpointRegistry->lookup(destination, current);
+ mInitiatorCallback->ice_exception();
+ finish();
+ }
- if (endpoints.empty())
- {
- throw DestinationNotFoundException(destination);
- }
- }
- catch (const DestinationNotFoundException&)
+ /**
+ * Inform initiator that this operation finished with
+ * an unspecified exception.
+ */
+ void finishAndSendResult()
+ {
+ mInitiatorCallback->ice_response();
+ finish();
+ }
+
+ /**
+ * This operation is called via a LookupCallback as a result of AMI calls
+ * to the SessionManagers' EndpointLocators.
+ */
+ void setLookupResult(const EndpointSeq& endpoints)
+ {
+ mLookupResult = endpoints;
+
+ // Reschedule this operation to complete.
+ try
{
- // rethrow
- throw;
+ mOperationsManager->reschedule(this);
}
- catch (const Ice::Exception &)
+ catch(const Ice::Exception& e)
{
- // Probably couldn't access the EndpointLocator of the registered channel.
- throw EndpointUnreachableException(destination);
+ finishWithException(e);
}
+ }
+
+protected: // These protected operations are utiltity functions.
+
+ /**
+ * Common completion code.
+ */
+ void finish()
+ {
+ // Mark internal state as finished.
+ mFinished = true;
- return endpoints;
+ // Inform our container that we are complete.
+ mOperationsManager->finished(this);
}
+ /**
+ * Initiate a lookup of the requested endpoint.
+ * @param destination Destination to be looked up.
+ * @param current The Ice::Current reference.
+ */
+ void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
+ {
+ try
+ {
+ // This component's own lookup interface is implemented as AMD.
+ // We provide our override of the appropriate AMD callback.
+ AMD_EndpointLocator_lookupPtr lookupCallback;
+
+ lookupCallback = new LookupCallback<T>(this);
+
+ // Lookup the destination.
+ mSessionContext.endpointRegistry->lookup_async(lookupCallback, destination, current);
+ }
+ catch (...)
+ {
+ finishWithException();
+ }
+ }
/**
* Forward the start() operation to all sessions in a given sequence.
@@ -445,8 +578,7 @@ public:
catch (const Ice::Exception &e)
{
lg(Error) << "Unable to forward the start() operation to session " << (*s) << " Details: " << e.what();
- // TBD... probably other bridge cleanup needs to be done.
- throw;
+ finishWithException(e);
}
}
}
@@ -456,7 +588,7 @@ public:
*/
BridgePrx getBridge(SessionPrx session)
{
- BridgePrx result(0);
+ BridgePrx result;
RetryPolicy policy(5, 500);
while(policy.canRetry())
@@ -466,23 +598,26 @@ public:
result = session->getBridge();
break;
}
- catch(const Ice::ConnectionLostException&)
+ catch(const Ice::ConnectionLostException& cle)
{
if(!policy.retry())
{
lg(Error) << "getBridge(): ConnectionLostException getting bridge for session, failed " << policy.maxRetries() << " retries." ;
- throw;
+ finishWithException(cle);
+ throw cle;
}
}
catch(const NotBridged& e)
{
lg(Error) << "getBridge(): session is not bridged." ;
- throw e; // rethrow
+ finishWithException(e);
+ throw e;
}
catch(const Ice::Exception& e)
{
lg(Error) << "getBridge(): Ice exception getting bridge for session:" << e.what();
- throw e; // rethrow
+ finishWithException(e);
+ throw e;
}
}
@@ -493,7 +628,7 @@ public:
* Create a session to each of a given set of endpoints, and return a collection of the
* newly added sessions.
*/
- SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination, SessionListenerAllocator& listener)
+ SessionSeq createSessionForEndpoints(const EndpointSeq& endpoints, const string& destination)
{
// Add a session
SessionSeq newSessions;
@@ -505,21 +640,22 @@ public:
// Create a session on the destination.
lg(Debug) << "createSessionForEndpoints(): Creating a session at destination " << destination;
- SessionPrx destSession = sessionEndpoint->createSession(destination, listener->getProxy());
+ SessionPrx destSession = sessionEndpoint->createSession(destination, mListenerManager->getListener()->getProxy());
if(!destSession)
{
lg(Debug) << " Session endpoint returned a null proxy, continuing with other endpoints";
continue;
- }
+ }
+
lg(Debug) << " Session proxy: " << destSession->ice_toString() ;
- listener->addSession(destSession);
+ mListenerManager->getListener()->addSession(destSession);
newSessions.push_back(destSession);
}
catch(const Ice::Exception &exception)
{
lg(Error) << "Unable to create session for " << destination << ". " << exception.what();
- // We may be able to reach SOME of the endpoints.
+ // We may be able to reach SOME of the endpoints.
}
}
return newSessions;
@@ -528,20 +664,35 @@ public:
/**
* Accessor for the sessions in a bridge.
* @bridge The bridge whose sessions are to be accessed.
- * @except An optional session proxy to be excluded from the list of sessions.
*/
- SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except=0)
+ SessionSeq getSessionsInBridge(const BridgePrx& bridge)
+ {
+ SessionSeq allSessions;
+ try
+ {
+ allSessions = bridge->listSessions();
+ }
+ catch(const Ice::Exception &e)
+ {
+ lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
+ finishWithException(e);
+
+ }
+ return allSessions;
+ }
+
+ /**
+ * Accessor for the sessions in a bridge.
+ * @bridge The bridge whose sessions are to be accessed.
+ * @except Session proxy to be excluded from the list of sessions.
+ */
+ SessionSeq getSessionsInBridge(const BridgePrx& bridge, const SessionPrx& except)
{
SessionSeq sessions;
try
{
SessionSeq allSessions = bridge->listSessions();
- if (except == 0)
- {
- return allSessions;
- }
-
for(SessionSeq::iterator s = allSessions.begin(); s !=allSessions.end(); ++s)
{
if (except->ice_getIdentity() != (*s)->ice_getIdentity())
@@ -553,7 +704,7 @@ public:
catch(const Ice::Exception &e)
{
lg(Error) << "Unable to get list of sessions for bridge. Throwing " << e.what();
- throw e; // rethrow
+ finishWithException(e);
}
return sessions;
}
@@ -588,225 +739,644 @@ public:
return removedSessions;
}
-public:
- Ice::ObjectAdapterPtr mAdapter;
- EndpointRegistryPtr mEndpointRegistry;
- RoutingEventsPtr mEventPublisher;
- AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx> mBridgeManager;
-};
+ void setState(const boost::function<void ()>& stateHandler, std::string stateName)
+ {
+ lg(Debug) << "Operation setting new state handler " << stateName;
+ mCurrentStateHandler = stateHandler;
+ }
-SessionRouter::SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher) :
- mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher))
-{
-}
+protected:
+ T mInitiatorCallback;
+ SessionContext mSessionContext;
+ WorkQueue::PoolId mPoolId;
-SessionRouter::~SessionRouter()
-{
- mImpl.reset();
-}
+ bool mFinished;
+ EndpointSeq mLookupResult;
+ SessionListenerManagerPtr mListenerManager;
+ OperationsManager* mOperationsManager;
-void SessionRouter::setBridgeManager(
- const AsteriskSCF::SmartProxy::SmartProxy<
- SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
-{
- mImpl->mBridgeManager = bridgeAccessor;
-}
+private:
+ boost::function<void ()> mCurrentStateHandler; // Lightweight state machine. Current state handles doWork() for a given state.
+
+}; // class SessionRouterOperation
/**
- * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
- * TBD - Need to rework with asynch support.
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation.
+ *
+ * This object is an instance of WorkQueue::Work so that
+ * it can be enqueued to a worker thread.
*/
-void SessionRouter::routeSession(
- const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const std::string& destination,
- const Ice::Current& current)
+class RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
{
- lg(Debug) << "routeSession() entered with destination " << destination ;
+public:
+ RouteSessionOperation(const AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>(cb,
+ context,
+ listener,
+ boost::bind(&RouteSessionOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSource(source),
+ mDestination(destination),
+ mIceCurrent(current)
+ {
+ }
- if (!mImpl->mBridgeManager.initializeOnce())
+ virtual ~RouteSessionOperation()
{
- lg(Error) << "No proxy to BridgeManager. "
- "Make sure all services are running.";
- throw BridgingException(source->getEndpoint()->getId(), destination);
+ lg(Debug) << "RouteSessionOperation() being destroyed for " << mDestination ;
}
- // Create a listener for the source to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- SessionListenerAllocator listener(mImpl->mAdapter, source);
+private:
+ /**
+ * We start routing the session by looking up the endpoint of the destination.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void lookupState()
+ {
+ lg(Debug) << "routeSession() entered with destination " << mDestination ;
+
+ if (!mSessionContext.bridgeManager.initializeOnce())
+ {
+ lg(Error) << "No proxy to BridgeManager. "
+ "Make sure all services are running.";
- // Route the destination
- lg(Debug) << "routeSession(): Routing destination " << destination;
- EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
- // Add a session to the endpoints.
- SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+ // Create a listener for the source to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
+ mListenerManager = listener;
- if (listener->getNumSessions() < 2)
+ // Set the state handler to exectute once we've looked up our endpoints.
+ setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
+
+ // Lookup the destination. This will use AMI, and the callback will
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
+ }
+
+ /**
+ * Entering this state, the destination endpoint has been obtained. This state
+ * completes the operation by creating the bridge.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
+ */
+ void establishBridgeState()
+ {
+ if (mFinished)
+ {
+ return;
+ }
+
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
+
+ // Add a session to the endpoints.
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
+
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ finishWithException(SourceTerminatedPreBridgingException(mSource->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the
+ // Bridge's functionality if we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Create the bridge
+ BridgePrx bridge;
+ try
+ {
+ SessionSeq bridgedSessions;
+ bridgedSessions.push_back(mSource);
+
+ bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
+ bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+
+ lg(Debug) << "routeSession(): Creating bridge.";
+ bridge = mSessionContext.bridgeManager->createBridge(bridgedSessions, 0);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
+
+ finishWithException(BridgingException(mSource->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ // Forward the start to all the destinations routed to.
+ lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
+ forwardStart(newSessions);
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+ }
+
+private:
+ // Operation input params.
+ AMD_SessionRouter_routeSessionPtr mInitiatorCallback;
+ SessionPrx mSource;
+ string mDestination;
+ ::Ice::Current mIceCurrent;
+
+}; // class RouteSessionOperation
+
+/**
+ * This operation replaces one session in a Bridge with a new session routable by the
+ * destination param. This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
+ */
+ class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
+{
+public:
+ ConnectBridgedSessionsWithDestinationOperation(const AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>(cb,
+ context,
+ listener,
+ boost::bind(&ConnectBridgedSessionsWithDestinationOperation::lookupState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mDestination(destination),
+ mIceCurrent(current)
{
- throw SessionCreationException(destination);
}
- if (listener->isTerminated())
+ virtual ~ConnectBridgedSessionsWithDestinationOperation()
{
- throw SourceTerminatedPreBridgingException(source->getEndpoint()->getId());
+ lg(Debug) << "ConnectBridgedSessionsWithDestinationOperation() being destroyed for " << mDestination ;
}
- // We're through listening, and we will probably interfere with the
- // Bridge's functionality if we keep listening.
- listener->unregister();
+private:
- // Create the bridge
- BridgePrx bridge;
- try
+ void lookupState()
{
- SessionSeq bridgedSessions;
- bridgedSessions.push_back(source);
+ lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << mDestination;
+
+ mBridge = mSessionToReplace->getBridge();
+
+ mRemainingSessions = getSessionsInBridge(mBridge, mSessionToReplace);
+
+ // Create a listener for the sessions not being replaced to handle early termination.
+ // The wrapper we're using will remove the listener and free it when
+ // this method is left.
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
+ mListenerManager = listener;
- bridgedSessions.reserve(bridgedSessions.size() + newSessions.size());
- bridgedSessions.insert(bridgedSessions.end(), newSessions.begin(), newSessions.end());
+ // Route the destination
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << mDestination;
- lg(Debug) << "routeSession(): Creating bridge.";
- bridge = mImpl->mBridgeManager->createBridge(bridgedSessions, 0);
+ // Set the state to exectute after lookup.
+ setState(boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this), "establishBridgeState");
+ // mCurrentStateHandler = boost::bind(&ConnectBridgedSessionsWithDestinationOperation::establishBridgeState, this);
+
+ // Lookup the destination. This will use AMI, and the callback should
+ // schedule us to execute again.
+ lg(Debug) << "routeSession(): Routing destination " << mDestination;
+ lookupEndpoints(mDestination, mIceCurrent);
}
- catch (const Ice::Exception &e)
+
+ /**
+ * Entering this state, the destination endpoint has been obtained.
+ * This operation is invoked from the worker thread, having been scheduled by
+ * the callback from lookup.
+ */
+ void establishBridgeState()
{
- lg(Debug) << "routeSession(): Exception creating bridge: " << e.what();
- listener->unregister();
+ if (mFinished)
+ {
+ return;
+ }
+
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
+
+ // Add a session
+ SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
+
+ if (mListenerManager->getListener()->getNumSessions() < 2)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << mDestination << " in connectBridgedSessionsWithDestination(). " ;
+ finishWithException(SessionCreationException(mDestination));
+ return;
+ }
+
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
+ finishWithException( SourceTerminatedPreBridgingException(mRemainingSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ mListenerManager->getListener()->unregister();
+
+ // Modify the bridge
+ try
+ {
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << mDestination;
+ mBridge->replaceSession(mSessionToReplace, newSessions);
+ }
+ catch (const Ice::Exception &e)
+ {
+ lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
+ finishWithException(BridgingException(mRemainingSessions[0]->getEndpoint()->getId(), mDestination));
+ return;
+ }
+
+ lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
+ forwardStart(newSessions);
- throw BridgingException(source->getEndpoint()->getId(), destination);
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
}
+private:
+ // Operation input params.
+ AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr mInitiatorCallback;
+ SessionPrx mSessionToReplace;
+ string mDestination;
+ ::Ice::Current mIceCurrent;
- // Forward the start to all the destinations routed to.
- lg(Debug) << "routeSession(): Sending start() to newly routed destination.";
- mImpl->forwardStart(newSessions);
+ // Implementation state
+ BridgePrx mBridge;
+ SessionSeq mRemainingSessions;
-} // SessionRouter::routeSession(...)
+}; // class ConnectBridgedSessionsWithDestinationOperation
/**
- * Replace one session in a Bridge with a new
- * session routable by the destination param.
- * @param source The session initiating the routing event.
- * @param destination The address or id of the destination to be routed.
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
-void SessionRouter::connectBridgedSessionsWithDestination(const SessionPrx& sessionToReplace,
- const ::std::string& destination,
- const Ice::Current& current)
+class ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
{
- lg(Debug) << "connectBridgedSessionsWithDestination() entered with destination " << destination;
+public:
+ ConnectBridgedSessionsOperation(const AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current,
+ const SessionContext& context,
+ OperationsManager* const listener)
+ : SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>(cb,
+ context,
+ listener,
+ boost::bind(&ConnectBridgedSessionsOperation::connectBridgedSessionsState, this)),
+ mInitiatorCallback(cb),
+ mSessionToReplace(sessionToReplace),
+ mBridgedSession(bridgedSession),
+ mIceCurrent(current)
+ {
+ }
+
+ virtual ~ConnectBridgedSessionsOperation()
+ {
+ lg(Debug) << "ConnectBridgedSessionsOperation() being destroyed." ;
+ }
+
+private:
+ /**
+ * Replace one session in a Bridge with sessions from another bridge.
+ */
+ void connectBridgedSessionsState()
+ {
+ lg(Debug) << "connectBridgedSessions() entered... ";
- BridgePrx bridge(sessionToReplace->getBridge());
+ // Get the bridge being merged into.
+ BridgePrx mergeBridge = getBridge(mSessionToReplace);
- SessionSeq remainingSessions = mImpl->getSessionsInBridge(bridge, sessionToReplace);
+ SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
- // Create a listener for the sessions not being replaced to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
- SessionListenerAllocator listener(mImpl->mAdapter, remainingSessions);
+ // Create a listener for the sessions not being replaced to handle early termination.
+ lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+ mListenerManager = listener;
- // Route the destination
- lg(Debug) << "connectBridgedSessionsWithDestination(): Routing destination " << destination;
- EndpointSeq endpoints = mImpl->lookupEndpoints(destination, current);
+ // Get the bridge for the sessions being moved.
+ BridgePrx oldBridge = getBridge(mBridgedSession);
- // Add a session
- SessionSeq newSessions = mImpl->createSessionForEndpoints(endpoints, destination, listener);
+ SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
- if (listener->getNumSessions() < 2)
+ // Check for early termination by the source.
+ if (mListenerManager->getListener()->isTerminated())
+ {
+ lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
+ finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
+ return;
+ }
+
+ // We're through listening, and we will probably interfere with the Bridge's functionality if
+ // we keep listening.
+ lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
+ mListenerManager->getListener()->unregister();
+
+ // Now replace the sessions.
+ try
+ {
+ lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
+ mergeBridge->replaceSession(mSessionToReplace, migratingSessions);
+ }
+ catch(const Ice::Exception& e)
+ {
+ lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
+ finishWithException(e); // rethrow
+ return;
+ }
+
+ // This operation is complete. Send AMD responses.
+ finishAndSendResult();
+ }
+
+private:
+ // Operation input params.
+ AMD_SessionRouter_connectBridgedSessionsPtr mInitiatorCallback;
+ SessionPrx mSessionToReplace;
+ SessionPrx mBridgedSession;
+ ::Ice::Current mIceCurrent;
+
+}; // class ConnectBridgedSessionsOperation
+
+
+/**
+ * An implementation of the AMD_EndpointLocator_lookup callback so
+ * that we can call our own lookup operation.
+ * Note that we're not really dispatching via AMD, but we're using the same
+ * AMD implementation that other components would use to do a lookup().
+ */
+template <typename T>
+class LookupCallback : public AMD_EndpointLocator_lookup
+{
+public:
+ LookupCallback(SessionRouterOperation<T>* operation)
+ : mOperation(operation)
{
- lg(Error) << "connectBridgedSessionsWithDestination(): Unable to create a new session for destination " << destination << " in connectBridgedSessionsWithDestination(). " ;
- throw SessionCreationException(destination);
}
- if (listener->isTerminated())
+ ~LookupCallback()
{
- lg(Notice) << "connectBridgedSessionsWithDestination(): Source ended session before transfer in connectBridgedSessionsWithDestination(). " ;
- throw SourceTerminatedPreBridgingException(remainingSessions[0]->getEndpoint()->getId());
+ lg(Debug) << "LookupCallback destroyed.";
}
- // We're through listening, and we will probably interfere with the Bridge's functionality if
- // we keep listening.
- listener->unregister();
- // Modify the bridge
- try
+public: // Overrides.
+
+ virtual void ice_exception(const ::std::exception& e)
{
- lg(Debug) << "connectBridgedSessionsWithDestination(): Replacing session with newly routed destination " << destination;
- bridge->replaceSession(sessionToReplace, newSessions);
+ mOperation->finishWithException(e);
}
- catch (const Ice::Exception &e)
+
+ virtual void ice_exception()
{
- lg(Error) << "connectBridgedSessionsWithDestination(): Exception replacing the session in connectBridgedSessionsWithDestination. " << e.what() ;
- throw BridgingException(remainingSessions[0]->getEndpoint()->getId(), destination);
+ mOperation->finishWithException();
}
- lg(Debug) << "connectBridgedSessionsWithDestination(): Forwarding start() to new session.";
- mImpl->forwardStart(newSessions);
+ virtual void ice_response(const ::AsteriskSCF::Core::Endpoint::V1::EndpointSeq& endpoints)
+ {
+ mOperation->setLookupResult(endpoints);
+ }
-} // SessionRouter::connectBridgedSessionsWithDestination(...)
+private:
+ SessionRouterOperation<T>* mOperation;
+};
+typedef map<WorkQueue::Work*, boost::shared_ptr<WorkQueue::Work> > OperationMap;
/**
- * Replace one session in a Bridge with sessions from another bridge.
- * No routing is actually performed. This operation exists here for consistency,
- * since connectBridgedSessionsWithDestination(...) is implemented by this interface.
- * @param sessionToReplace The session that is to be replaced in a
- * bridge. The bridge obejct associated with this session will survive, and
- * all sessions bridged to this session will be kept in the bridge.
- * @param bridgedSession This session is assumed to be bridged to the sessions
- * that are to be moved to another bridge. The bridgedSession itself will not
- * be connected to the other bridge. The sessions being moved will be removed from
- * their current bridge before being added to the bridge currenltly attached to
- * sessionToReplace.
+ * Private operations and state of the SessionRouter.
*/
-void SessionRouter::connectBridgedSessions(const SessionPrx& sessionToReplace,
- const SessionPrx& bridgedSession,
- const Ice::Current&)
+class SessionRouterPriv : public OperationsManager
{
- lg(Debug) << "connectBridgedSessions() entered... ";
+public:
+ SessionRouterPriv(const Ice::ObjectAdapterPtr& objectAdapter,
+ const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& workQueue) :
+ mSessionContext(objectAdapter,
+ endpointRegistry,
+ eventPublisher,
+ workQueue)
+ {
+ }
- // Get the bridge being merged into.
- BridgePrx mergeBridge = mImpl->getBridge(sessionToReplace);
+ ~SessionRouterPriv()
+ {
+ }
- SessionSeq preserveSessions = mImpl->getSessionsInBridge(mergeBridge, sessionToReplace);
+ /**
+ * Set the accessor to the bridge.
+ */
+ void setBridgeAccessor(const AsteriskSCF::SmartProxy::SmartProxy<
+ SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+ {
+ mSessionContext.bridgeManager = bridgeAccessor;
+ }
- // Create a listener for the sessions not being replaced to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
- lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
- SessionListenerAllocator listener(mImpl->mAdapter, preserveSessions);
+ /**
+ * Enqueue the work to the WorkQueue.
+ */
+ void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
+ {
+ // Maintain refs to all ongoing operations.
+ mOngoingOperations[work.get()] = work;
- // Get the bridge for the sessions being moved.
- BridgePrx oldBridge = mImpl->getBridge(bridgedSession);
+ // Enqueue work.
+ mSessionContext.workQueue->enqueue(work);
+ }
- SessionSeq migratingSessions = mImpl->removeSessionsFromBridge(oldBridge, bridgedSession);
+public: // Overrides
- // Check for early termination by the source.
- if (listener->isTerminated())
+ /**
+ * Handle a notice from an operation that it has completed.
+ * Remove our shared_ptr reference so that it will die.
+ */
+ virtual void finished(WorkQueue::Work* op)
{
- lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
- throw SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId());
+ boost::lock_guard<boost::mutex> guard(mLock);
+ OperationMap::iterator kvp = mOngoingOperations.find(op);
+
+ if (kvp != mOngoingOperations.end())
+ {
+ lg(Debug) << "Removing reference to finished opeation.";
+ mOngoingOperations.erase(kvp);
+ }
}
- // We're through listening, and we will probably interfere with the Bridge's functionality if
- // we keep listening.
- lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
- listener->unregister();
- // Now replace the sessions.
- try
+ /**
+ * Handle a an operation's need to reschedule itself.
+ * The operation doesn't have a shared_ptr to itself, so
+ * it can't do it internally.
+ */
+ virtual void reschedule(WorkQueue::Work *op)
{
- lg(Debug) << "connectBridgedSessions(): Asking bridge to replace sessions." ;
- mergeBridge->replaceSession(sessionToReplace, migratingSessions);
+ mSessionContext.workQueue->enqueue(getOngoingOperationSharedPointer(op));
}
- catch(const Ice::Exception& e)
+
+private:
+ /**
+ * Find our shared_ptr for a given Work object raw pointer.
+ */
+ boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
{
- lg(Error) << "connectBridgedSessions(): Unable to replace session for bridge in connectBridgedSessions(). " ;
- throw e; // rethrow
+ boost::lock_guard<boost::mutex> guard(mLock);
+ OperationMap::iterator kvp = mOngoingOperations.find(operation);
+
+ assert(kvp != mOngoingOperations.end());
+ if (kvp == mOngoingOperations.end())
+ {
+ throw Ice::UnknownException("SessionRouterPriv: Failed finding shared_ptr for SessionRouter operation.", 1);
+ }
+
+ return (*kvp).second;
}
-} // SessionRouter::connectBridgedSessions(...)
+public:
+ SessionContext mSessionContext;
+ OperationMap mOngoingOperations;
+ boost::mutex mLock;
+};
+
+/**
+ * The SessionRouter implementation.
+ */
+SessionRouter::SessionRouter(
+ const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& workQueue)
+ : mImpl(new SessionRouterPriv(objectAdapter, endpointRegistry, eventPublisher, workQueue))
+{
+}
+
+SessionRouter::~SessionRouter()
+{
+ mImpl.reset();
+}
+
+/**
+ * The BridgeManager proxy can only be obtained once our object adapter is activated, so our
+ * bridgeManager reference's initialization is deferred.
+ */
+void SessionRouter::setBridgeManager(
+ const AsteriskSCF::SmartProxy::SmartProxy<
+ SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
+{
+ mImpl->mSessionContext.bridgeManager = bridgeAccessor;
+}
+
+/**
+ * Route the session by looking up the destination endpoint and configuring a complimentary session for the destination.
+ */
+void SessionRouter::routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current& current)
+{
+ WorkPtr routeSessionOp(new RouteSessionOperation(cb,
+ source,
+ destination,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
+
+
+ mImpl->scheduleOperation(routeSessionOp);
+}
+
+/**
+ * Replace one session in a Bridge with a new
+ * session routable by the destination param.
+ * @param source The session initiating the routing event.
+ * @param destination The address or id of the destination to be routed.
+ */
+void SessionRouter::connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& current)
+{
+ WorkPtr connectBridgedSessionsWithDestinationOp(new ConnectBridgedSessionsWithDestinationOperation(cb,
+ sessionToReplace,
+ destination,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
+
+ mImpl->scheduleOperation(connectBridgedSessionsWithDestinationOp);
+
+}
+
+/**
+ * Replace one session in a Bridge with sessions from another bridge.
+ * No routing is actually performed. This operation exists here for consistency,
+ * since connectBridgedSessionsWithDestination(...) is implemented by this interface.
+ * @param sessionToReplace The session that is to be replaced in a
+ * bridge. The bridge obejct associated with this session will survive, and
+ * all sessions bridged to this session will be kept in the bridge.
+ * @param bridgedSession This session is assumed to be bridged to the sessions
+ * that are to be moved to another bridge. The bridgedSession itself will not
+ * be connected to the other bridge. The sessions being moved will be removed from
+ * their current bridge before being added to the bridge currenltly attached to
+ * sessionToReplace.
+ */
+
+void SessionRouter::connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current& current)
+{
+ WorkPtr connectBridgedSessionsOp(new ConnectBridgedSessionsOperation(cb,
+ sessionToReplace,
+ bridgedSession,
+ current,
+ mImpl->mSessionContext,
+ mImpl.get()));
+
+ mImpl->scheduleOperation(connectBridgedSessionsOp);
+
+}
+
} // end BasicRoutingService
} // end AsteriskSCF
diff --git a/src/SessionRouter.h b/src/SessionRouter.h
index d955519..014f3ce 100644
--- a/src/SessionRouter.h
+++ b/src/SessionRouter.h
@@ -21,6 +21,7 @@
#include "SmartProxy.h"
#include "SessionCommunications/SessionCommunicationsIf.h"
#include "EndpointRegistry.h"
+#include "WorkQueue.h"
namespace AsteriskSCF
{
@@ -34,8 +35,10 @@ class SessionRouterPriv;
class SessionRouter : public AsteriskSCF::SessionCommunications::V1::SessionRouter
{
public:
- SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
- const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher);
+ SessionRouter(const Ice::ObjectAdapterPtr& objectAdapter,
+ const EndpointRegistryPtr& endpointRegistry,
+ const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
+ const boost::shared_ptr<WorkQueue>& sessionRouterWorkQueue);
~SessionRouter();
void setBridgeManager(
@@ -55,6 +58,11 @@ public:
const std::string& destination,
const Ice::Current&);
+ virtual void routeSession_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_routeSessionPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const ::std::string& destination,
+ const ::Ice::Current&);
+
/**
* Replace a session in a bridge with a destination. The desintation will be routed.
* @param sessionToReplace The session to be replaced in a bridge. The affected Bridge interface is
@@ -65,6 +73,11 @@ public:
const ::std::string& destination,
const Ice::Current&);
+ virtual void connectBridgedSessionsWithDestination_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::std::string& destination,
+ const ::Ice::Current& );
+
/**
* Replace a session in a bridge with another session. If the newSession is already participating in a Bridge,
* it will be removed from it's current bridge prior to be used as a replacement.
@@ -76,6 +89,11 @@ public:
const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
const Ice::Current&);
+ virtual void connectBridgedSessions_async(const ::AsteriskSCF::SessionCommunications::V1::AMD_SessionRouter_connectBridgedSessionsPtr& cb,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionToReplace,
+ const ::AsteriskSCF::SessionCommunications::V1::SessionPrx& bridgedSession,
+ const ::Ice::Current&);
+
private:
boost::shared_ptr<SessionRouterPriv> mImpl;
};
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
new file mode 100644
index 0000000..fc33deb
--- /dev/null
+++ b/src/SimpleWorkQueue.cpp
@@ -0,0 +1,275 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+/**
+ * A simple Work Queue implementation. On construction, starts an internal thread.
+ * Work can be enqueued via the thread-safe enqueue() method. All work must implement
+ * the Work interface.
+ */
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+#include <list>
+
+#include "logger.h"
+
+#include "SimpleWorkQueue.h"
+
+using namespace AsteriskSCF;
+using namespace AsteriskSCF::System::Logging;
+using namespace boost;
+
+namespace AsteriskSCF
+{
+class SimpleWorkQueuePriv
+{
+public:
+ SimpleWorkQueuePriv(const std::string& id, const Logger& logger)
+ : mLogger(logger),
+ mQid(id),
+ mInitialized(false),
+ mFinished(false),
+ mPaused(false), // runs by default.
+ mThread(boost::bind(&SimpleWorkQueuePriv::execute, this))
+
+ {
+ mLogger(Debug) << "SimpleWorkQueue::private_impl constructor called. Queue ID:" << mQid;
+ }
+
+ ~SimpleWorkQueuePriv()
+ {
+ mLogger(Debug) << "SimpleWorkQueue::private_impl desctuctor called. Queue ID:" << mQid;
+ }
+
+ WorkPtr dequeue();
+ WorkPtr waitAndDequeue();
+ void execute();
+ bool isPaused();
+
+ const Logger& mLogger;
+ std::string mQid;
+ bool mInitialized;
+ bool mFinished;
+ bool mPaused;
+ boost::thread mThread;
+ std::list<WorkPtr> mQueue;
+ boost::mutex mQueueMutex;
+ boost::condition mEmptyQueueCondition;
+ boost::mutex mPauseMutex;
+ boost::condition mPauseCondition;
+};
+}
+
+SimpleWorkQueue::SimpleWorkQueue(const std::string& qid, const Logger& logger) : mImpl(new SimpleWorkQueuePriv(qid, logger))
+{
+ mImpl->mLogger(Debug) << "SimpleWorkQueue::Constructor() called. Queue ID:" << mImpl->mQid;
+ mImpl->mInitialized = true;
+}
+
+SimpleWorkQueue::~SimpleWorkQueue()
+{
+ mImpl->mLogger(Debug) << "SimpleWorkQueue::Destructor() called. Queue ID:" << mImpl->mQid;
+ terminate();
+
+ // Wait for worker thread to shut down.
+ mImpl->mThread.join(); // If you don't do this, then the mImpl is trashed and Execute has bad "this" ptr on other thread.
+}
+
+bool SimpleWorkQueue::isRunning()
+{
+ return (mImpl->mInitialized && !mImpl->mPaused && !mImpl->mFinished);
+}
+
+/**
+ * Pause the SimpleWorkQueue's thread.
+ */
+void SimpleWorkQueue::pause()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
+
+ boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+ mImpl->mPaused = true;
+}
+
+/**
+ * Resume from a Paused state.
+ */
+void SimpleWorkQueue::resume()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
+
+ boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+ mImpl->mPaused = false;
+ mImpl->mPauseCondition.notify_all();
+}
+
+/**
+ * Stops this thread from executing.
+ */
+void SimpleWorkQueue::terminate()
+{
+ mImpl->mLogger(Info) << "SimpleWorkQueue::Terminate called for queue " << mImpl->mQid ;
+
+ mImpl->mFinished = true;
+ mImpl->mPaused = false;
+ mImpl->mPauseCondition.notify_all(); // In case the thread was waiting on the PauseCondition.
+ mImpl->mEmptyQueueCondition.notify_all(); // In case the thread was waiting on an EmptyQueueCondition
+}
+
+/**
+ * A convenience method to determine if there is any pending work on the queue.
+ */
... 4342 lines suppressed ...
--
asterisk-scf/integration/routing.git
More information about the asterisk-scf-commits
mailing list