[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Dec 22 22:46:48 UTC 2010
branch "route_async" has been updated
via a0aae3bb0aac7731d1616b0146fc8d1196381c90 (commit)
from fbb061f636eff41df0a87504b6fbb7cde81d6b37 (commit)
Summary of changes:
src/EndpointRegistry.cpp | 2 +-
src/SessionRouter.cpp | 199 +++++++++++++++++++++++++++++++++-------------
src/SimpleWorkQueue.cpp | 27 ++++---
src/SimpleWorkQueue.h | 4 +-
src/WorkQueue.h | 4 +-
5 files changed, 163 insertions(+), 73 deletions(-)
- Log -----------------------------------------------------------------
commit a0aae3bb0aac7731d1616b0146fc8d1196381c90
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Dec 22 16:46:38 2010 -0600
Minor code cleanup for review.
diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 8c9ba89..6340009 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -190,7 +190,7 @@ public:
void notifyFailed()
{
- DestinationNotFoundException e;
+ DestinationNotFoundException e(mDestination);
mCallback->ice_exception(e);
// clear the mCallback pointer so we only answer once
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index fb10b26..18ba7e7 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -32,6 +32,7 @@ using namespace AsteriskSCF::Core::Endpoint::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::Core::Routing::V1::Event;
+using namespace AsteriskSCF::Core::Routing::V1;
using namespace std;
namespace
@@ -282,14 +283,12 @@ private:
typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
/**
- * This class uses RAII to manage the lifecycle of a session listener.
- * It's sort of a smart pointer for the listener, but it's tightly
- * coupled to the specifics of our private impl.
+ * This class manages the lifecycle of a session listener.
*/
-class SessionListenerAllocator
+class SessionListenerManager
{
public:
- SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
: mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
@@ -314,7 +313,7 @@ public:
}
}
- SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
+ SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
: mSessionListener(new SessionListenerImpl()),
mAdapter(adapter)
{
@@ -342,7 +341,7 @@ public:
}
}
- ~SessionListenerAllocator()
+ ~SessionListenerManager()
{
// Our private SessionListener implementation adds itself as a servant. It
// can't really undo that without getting itself deleted. So undo it
@@ -371,11 +370,6 @@ public:
}
}
- SessionListenerImpl* operator->() const
- {
- return mSessionListener;
- }
-
SessionListenerImpl* getListener() const
{
return mSessionListener;
@@ -385,12 +379,13 @@ private:
SessionListenerImpl *mSessionListener;
Ice::ObjectAdapterPtr mAdapter;
-}; // class SessionListenerAllocator
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
-typedef boost::shared_ptr<SessionListenerAllocator> SessionListenerAllocatorPtr;
/**
* Context required by all of the SessionRouter operations.
- * All of the items in the SessionContext are thread-safe. (i.e. no lock required).
+ * All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required).
*/
struct SessionContext
{
@@ -423,22 +418,37 @@ public:
class OperationsManager
{
public:
+ /**
+ * Operations indicate they are finished by calling this method.
+ */
virtual void finished(WorkQueue::Work *) = 0;
- virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation) = 0;
+
+ /**
+ * Operations can reschedule themselves on the WorkQueue.
+ */
+ virtual void reschedule(WorkQueue::Work *) = 0;
protected:
- OperationsManager() {}
+ OperationsManager() {} // Can't construct directly. This class is an interface.
};
/**
* This is a base class for worker objects that offload SessionRouter operations
- * to a worker thead. It implements the WorkQueue::Work
+ * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
* interface so that it can be enqueued to a worker thread or thread pool.
+ * The template parameter T is the type of the AMD callback type for the
+ * particular operation.
*/
template<typename T>
class SessionRouterOperation : public WorkQueue::Work
{
public:
+ /**
+ * Constructor.
+ * @param amdCallback The callback object to provide results to the initiator of this operation.
+ * @param context The SessionContext provides references to key objects needed by each operation.
+ * @param manager
+ */
SessionRouterOperation(const T& amdCallback,
const SessionContext& context,
OperationsManager* manager,
@@ -455,11 +465,18 @@ public:
{
}
+ /**
+ * An implementation of the WorkQueue::Work interface.
+ */
virtual void doWork()
{
mCurrentStateHandler();
}
+ /**
+ * Inform initiator that this operation finished with
+ * the specified exception.
+ */
void finishWithException(const ::std::exception& e)
{
// Forward to this operation's initiator.
@@ -467,47 +484,72 @@ public:
finish();
}
+ /**
+ * Inform initiator that this operation finished with
+ * an unspecified exception.
+ */
void finishWithException()
{
mInitiatorCallback->ice_exception();
finish();
}
+ /**
+ * Inform initiator that this operation finished with
+ * an unspecified exception.
+ */
void finishAndSendResult()
{
mInitiatorCallback->ice_response();
finish();
}
+ /**
+ * This operation is called via a LookupCallback as a result of AMI calls
+ * to the SessionManagers' EndpointLocators.
+ */
void setLookupResult(const EndpointSeq& endpoints)
{
mLookupResult = endpoints;
- // Reschedule the operation to complete. The operation will have
- // advanced to the correct state handler.
- mSessionContext.workQueue->enqueue(mOperationsManager->getOngoingOperationSharedPointer(this));
+ // Reschedule this operation to complete.
+ try
+ {
+ mOperationsManager->reschedule(this);
+ }
+ catch(const Ice::Exception& e)
+ {
+ finishWithException(e);
+ }
}
protected: // These protected operations are utiltity functions.
+ /**
+ * Common completion code.
+ */
void finish()
{
+ // Mark internal state as finished.
mFinished = true;
+
+ // Inform our container that we are complete.
mOperationsManager->finished(this);
}
/**
* Initiate a lookup of the requested endpoint.
+ * @param destination Destination to be looked up.
+ * @param current The Ice::Current reference.
*/
void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
{
try
{
// This component's own lookup interface is implemented as AMD.
- // We provide our override of AMD callback.
+ // We provide our override of the appropriate AMD callback.
AMD_EndpointLocator_lookupPtr lookupCallback;
- boost::shared_ptr<WorkQueue::Work> workPtr = mOperationsManager->getOngoingOperationSharedPointer(this);
lookupCallback = new LookupCallback<T>(this);
// Lookup the destination.
@@ -519,7 +561,6 @@ protected: // These protected operations are utiltity functions.
}
}
-
/**
* Forward the start() operation to all sessions in a given sequence.
*/
@@ -701,7 +742,7 @@ protected:
bool mFinished;
EndpointSeq mLookupResult;
- SessionListenerAllocatorPtr mListenerManager;
+ SessionListenerManagerPtr mListenerManager;
OperationsManager* mOperationsManager;
private:
@@ -710,8 +751,12 @@ private:
}; // class SessionRouterOperation
/**
- * This is a specialization of the SessionRouterOperation to handle the
- * routeSession() operation. This object is an instance of WorkQueue::Work so that
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation.
+ *
+ * This object is an instance of WorkQueue::Work so that
* it can be enqueued to a worker thread.
*/
class RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
@@ -741,9 +786,10 @@ public:
private:
/**
- * We start routing the session by looking up endpoint of the destination.
- * This method represents processing in our initial state.
- * It is executed off the worker thread.
+ * We start routing the session by looking up the endpoint of the destination.
+ *
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
*/
void lookupState()
{
@@ -761,15 +807,13 @@ private:
// Create a listener for the source to handle early termination.
// The wrapper we're using will remove the listener and free it when
// this method is left.
- SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSource));
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
mListenerManager = listener;
- // Set the state to exectute once we've looked up our endpoints.
+ // Set the state handler to exectute once we've looked up our endpoints.
setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
- // mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
-
- // Lookup the destination. This will use AMI, and the callback should
+ // Lookup the destination. This will use AMI, and the callback will
// schedule us to execute again.
lg(Debug) << "routeSession(): Routing destination " << mDestination;
lookupEndpoints(mDestination, mIceCurrent);
@@ -779,7 +823,8 @@ private:
* Entering this state, the destination endpoint has been obtained. This state
* completes the operation by creating the bridge.
*
- * This operation is the final state, and is executed off the worker thread.
+ * This method is called via mCurrentStateHandler when doWork() is executed from the
+ * WorkQueue.
*/
void establishBridgeState()
{
@@ -788,7 +833,12 @@ private:
return;
}
- assert(mLookupResult.size() > 0);
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
// Add a session to the endpoints.
SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
@@ -848,12 +898,13 @@ private:
}; // class RouteSessionOperation
/**
- * This operation replaces one session in a Bridge with a new session routable
- * by the destination param.
+ * This operation replaces one session in a Bridge with a new session routable by the
+ * destination param. This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
*
- * This is a specialization of the SessionRouterOperation that handles the
- * connectBridgedSessionsWithDestination() operation. This object is an instance
- * of WorkQueue::Work so that it can enqueued to a worker thread.
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
class ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
{
@@ -894,7 +945,7 @@ private:
// The wrapper we're using will remove the listener and free it when
// this method is left.
lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
- SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSessionToReplace));
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
mListenerManager = listener;
// Route the destination
@@ -922,6 +973,13 @@ private:
return;
}
+ assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed.
+ if (mLookupResult.size() < 1)
+ {
+ finishWithException(DestinationNotFoundException(mDestination));
+ return;
+ }
+
// Add a session
SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
@@ -977,10 +1035,13 @@ private:
}; // class ConnectBridgedSessionsWithDestinationOperation
/**
- * This is a specialization of the SessionRouterOperation that handles the
- * connectBridgedSessions() operation.
* Replace one session in a Bridge with sessions from another bridge.
- * No routing is actually performed. This operation exists here for consistency,
+ * No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to
+ * the initiator of this operation.
+ *
* This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread.
*/
class ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
@@ -1008,7 +1069,6 @@ public:
private:
/**
* Replace one session in a Bridge with sessions from another bridge.
- * No routing is actually performed. This operation exists here for consistency,
*/
void connectBridgedSessionsState()
{
@@ -1020,10 +1080,9 @@ private:
SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
// Create a listener for the sessions not being replaced to handle early termination.
- // The wrapper we're using will remove the listener and free it when
- // this method is left.
lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
- SessionListenerAllocator listener(mSessionContext.adapter, preserveSessions);
+ SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+ mListenerManager = listener;
// Get the bridge for the sessions being moved.
BridgePrx oldBridge = getBridge(mBridgedSession);
@@ -1031,7 +1090,7 @@ private:
SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
// Check for early termination by the source.
- if (listener->isTerminated())
+ if (mListenerManager->getListener()->isTerminated())
{
lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
@@ -1041,7 +1100,7 @@ private:
// We're through listening, and we will probably interfere with the Bridge's functionality if
// we keep listening.
lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
- listener->unregister();
+ mListenerManager->getListener()->unregister();
// Now replace the sessions.
try
@@ -1073,7 +1132,7 @@ private:
/**
* An implementation of the AMD_EndpointLocator_lookup callback so
* that we can call our own lookup operation.
- * Note that we're not really using AMD, but we're using the same
+ * Note that we're not really dispatching via AMD, but we're using the same
* AMD implementation that other components would use to do a lookup().
*/
template <typename T>
@@ -1143,9 +1202,15 @@ public:
mSessionContext.bridgeManager = bridgeAccessor;
}
+ /**
+ * Enqueue the work to the WorkQueue.
+ */
void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
{
+ // Maintain refs to all ongoing operations.
mOngoingOperations[work.get()] = work;
+
+ // Enqueue work.
mSessionContext.workQueue->enqueue(work);
}
@@ -1167,16 +1232,31 @@ public: // Overrides
}
}
+ /**
+ * Handle a an operation's need to reschedule itself.
+ * The operation doesn't have a shared_ptr to itself, so
+ * it can't do it internally.
+ */
+ virtual void reschedule(WorkQueue::Work *op)
+ {
+ mSessionContext.workQueue->enqueue(getOngoingOperationSharedPointer(op));
+ }
+
+private:
/**
- * The operations sometimes need a shared_ptr to themselves to hand off to callbacks or
- * other objects being processed in other threads.
+ * Find our shared_ptr for a given Work object raw pointer.
*/
- virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
+ boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
{
boost::lock_guard<boost::mutex> guard(mLock);
OperationMap::iterator kvp = mOngoingOperations.find(operation);
assert(kvp != mOngoingOperations.end());
+ if (kvp == mOngoingOperations.end())
+ {
+ throw Ice::UnknownException("SessionRouterPriv: Failed finding shared_ptr for SessionRouter operation.", 1);
+ }
+
return (*kvp).second;
}
@@ -1186,6 +1266,9 @@ public:
boost::mutex mLock;
};
+/**
+ * The SessionRouter implementation.
+ */
SessionRouter::SessionRouter(
const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
@@ -1199,6 +1282,10 @@ SessionRouter::~SessionRouter()
mImpl.reset();
}
+/**
+ * The BridgeManager proxy can only be obtained once our object adapter is activated, so our
+ * bridgeManager reference's initialization is deferred.
+ */
void SessionRouter::setBridgeManager(
const AsteriskSCF::SmartProxy::SmartProxy<
SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
index f55ff82..48cabc2 100644
--- a/src/SimpleWorkQueue.cpp
+++ b/src/SimpleWorkQueue.cpp
@@ -101,8 +101,8 @@ void SimpleWorkQueue::pause()
{
mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
- boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
- mImpl->mPaused = true;
+ boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+ mImpl->mPaused = true;
}
/**
@@ -112,7 +112,7 @@ void SimpleWorkQueue::resume()
{
mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
- boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+ boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
mImpl->mPaused = false;
mImpl->mPauseCondition.notify_all();
}
@@ -153,13 +153,16 @@ static WorkQueue::PoolId mNoOpPoolId;
/**
* Enqueue an item of work for processing on this queue's thread.
*/
-WorkQueue::PoolId SimpleWorkQueue::enqueue(WorkPtr w)
+WorkQueue::PoolId SimpleWorkQueue::enqueue(const WorkPtr& w)
{
- boost::mutex::scoped_lock lock(mImpl->mQueueMutex);
- bool wasEmpty = mImpl->mQueue.empty();
- mImpl->mQueue.push_back(w);
- int size = mImpl->mQueue.size();
- lock.unlock();
+ bool wasEmpty(false);
+
+ { // scope for the mutex.
+ boost::lock_guard<boost::mutex> lock(mImpl->mQueueMutex);
+ wasEmpty = mImpl->mQueue.empty();
+ mImpl->mQueue.push_back(w);
+ int size = mImpl->mQueue.size();
+ }
if (wasEmpty)
{
@@ -187,7 +190,7 @@ static shared_ptr<WorkQueue::Work> NO_WORK_PTR(new NO_WORK_CLASS());
*/
WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
{
- boost::mutex::scoped_lock lock(mQueueMutex);
+ boost::unique_lock<boost::mutex> lock(mQueueMutex);
int size = mQueue.size(); // debugging
@@ -216,7 +219,7 @@ WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
bool SimpleWorkQueuePriv::isPaused()
{
- boost::mutex::scoped_lock lock(mPauseMutex);
+ boost::lock_guard<boost::mutex> lock(mPauseMutex);
return mPaused;
}
@@ -237,7 +240,7 @@ void SimpleWorkQueuePriv::execute()
while (!mFinished)
{
{ // scope the lock
- boost::mutex::scoped_lock lock(mPauseMutex);
+ boost::unique_lock<boost::mutex> lock(mPauseMutex);
while(mPaused)
{
mLogger(Debug) << "SimpleWorkQueue::Execute: Waiting while paused. Queue ID:" << mQid;
diff --git a/src/SimpleWorkQueue.h b/src/SimpleWorkQueue.h
index 164b1a7..024492c 100644
--- a/src/SimpleWorkQueue.h
+++ b/src/SimpleWorkQueue.h
@@ -41,11 +41,11 @@ public:
SimpleWorkQueue(const std::string& id, const AsteriskSCF::System::Logging::Logger& logger);
~SimpleWorkQueue();
- virtual WorkQueue::PoolId enqueue(WorkPtr w);
+ virtual WorkQueue::PoolId enqueue(const WorkPtr& w);
virtual void terminate();
virtual void join();
- // This implementation adds the concept of Pausing to the generic IWorkQueue.
+ // This implementation adds the concept of Pausing to the generic WorkQueue.
bool isRunning();
bool workPending();
void pause();
diff --git a/src/WorkQueue.h b/src/WorkQueue.h
index c3a7a9a..4d969ad 100644
--- a/src/WorkQueue.h
+++ b/src/WorkQueue.h
@@ -51,7 +51,7 @@ public:
/**
* Enqueue work to be peformed.
*/
- virtual PoolId enqueue(WorkPtr w) = 0;
+ virtual PoolId enqueue(const WorkPtr& w) = 0;
/**
* Enqueue work to be performed, and specify a particular thread
@@ -59,7 +59,7 @@ public:
* Note: Implementations of Thread Pools are expected to override this
* default implementation.
*/
- virtual void enqueue(WorkPtr w, PoolId)
+ virtual void enqueue(const WorkPtr& w, PoolId)
{
enqueue(w);
}
-----------------------------------------------------------------------
--
team/ken.hunt/route_async_routing.git
More information about the asterisk-scf-commits
mailing list