[asterisk-scf-commits] team/ken.hunt/route_async_routing.git branch "route_async" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Dec 22 22:46:48 UTC 2010


branch "route_async" has been updated
       via  a0aae3bb0aac7731d1616b0146fc8d1196381c90 (commit)
      from  fbb061f636eff41df0a87504b6fbb7cde81d6b37 (commit)

Summary of changes:
 src/EndpointRegistry.cpp |    2 +-
 src/SessionRouter.cpp    |  199 +++++++++++++++++++++++++++++++++-------------
 src/SimpleWorkQueue.cpp  |   27 ++++---
 src/SimpleWorkQueue.h    |    4 +-
 src/WorkQueue.h          |    4 +-
 5 files changed, 163 insertions(+), 73 deletions(-)


- Log -----------------------------------------------------------------
commit a0aae3bb0aac7731d1616b0146fc8d1196381c90
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Wed Dec 22 16:46:38 2010 -0600

    Minor code cleanup for review.

diff --git a/src/EndpointRegistry.cpp b/src/EndpointRegistry.cpp
index 8c9ba89..6340009 100644
--- a/src/EndpointRegistry.cpp
+++ b/src/EndpointRegistry.cpp
@@ -190,7 +190,7 @@ public:
 
     void notifyFailed()
     {
-        DestinationNotFoundException e;
+        DestinationNotFoundException e(mDestination);
         mCallback->ice_exception(e);
 
         // clear the mCallback pointer so we only answer once
diff --git a/src/SessionRouter.cpp b/src/SessionRouter.cpp
index fb10b26..18ba7e7 100644
--- a/src/SessionRouter.cpp
+++ b/src/SessionRouter.cpp
@@ -32,6 +32,7 @@ using namespace AsteriskSCF::Core::Endpoint::V1;
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::Core::Routing::V1::Event;
+using namespace AsteriskSCF::Core::Routing::V1;
 using namespace std;
 
 namespace
@@ -282,14 +283,12 @@ private:
 typedef IceInternal::Handle<SessionListenerImpl> SessionListenerImplPtr;
 
 /**
- * This class uses RAII to manage the lifecycle of a session listener.
- * It's sort of a smart pointer for the listener, but it's tightly
- * coupled to the specifics of our private impl.
+ * This class manages the lifecycle of a session listener.
  */
-class SessionListenerAllocator
+class SessionListenerManager
 {
 public:
-    SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
+    SessionListenerManager(Ice::ObjectAdapterPtr adapter, const SessionPrx& session)
         : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
@@ -314,7 +313,7 @@ public:
         }
     }
 
-    SessionListenerAllocator(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
+    SessionListenerManager(Ice::ObjectAdapterPtr adapter, SessionSeq& sessionSequence)
         : mSessionListener(new SessionListenerImpl()),
           mAdapter(adapter)
     {
@@ -342,7 +341,7 @@ public:
         }
     }
 
-    ~SessionListenerAllocator()
+    ~SessionListenerManager()
     {
         // Our private SessionListener implementation adds itself as a servant. It
         // can't really undo that without getting itself deleted. So undo it
@@ -371,11 +370,6 @@ public:
         }
     }
 
-    SessionListenerImpl* operator->() const
-    {
-        return mSessionListener;
-    }
-
     SessionListenerImpl* getListener() const
     {
         return mSessionListener;
@@ -385,12 +379,13 @@ private:
     SessionListenerImpl *mSessionListener;
     Ice::ObjectAdapterPtr mAdapter;
 
-}; // class SessionListenerAllocator
+}; // class SessionListenerManager
+
+typedef boost::shared_ptr<SessionListenerManager> SessionListenerManagerPtr;
 
-typedef boost::shared_ptr<SessionListenerAllocator> SessionListenerAllocatorPtr;
 /**
  * Context required by all of the SessionRouter operations. 
- *  All of the items in the SessionContext are thread-safe. (i.e. no lock required). 
+ *  All of the items in the SessionContext provide thread-safe interfaces. (i.e. no lock required). 
  */
 struct SessionContext
 {
@@ -423,22 +418,37 @@ public:
 class OperationsManager
 {
 public:
+    /**
+     * Operations indicate they are finished by calling this method.
+     */
     virtual void finished(WorkQueue::Work *) = 0;
-    virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation) = 0; 
+
+    /**
+     * Operations can reschedule themselves on the WorkQueue. 
+     */
+    virtual void reschedule(WorkQueue::Work *) = 0;
 
 protected:
-    OperationsManager() {}
+    OperationsManager() {}  // Can't construct directly. This class is an interface. 
 };
 
 /**
  * This is a base class for worker objects that offload SessionRouter operations 
- * to a worker thead. It implements the WorkQueue::Work
+ * to a worker thead during an AMD invocation. It implements the WorkQueue::Work
  * interface so that it can be enqueued to a worker thread or thread pool. 
+ * The template parameter T is the type of the AMD callback type for the 
+ * particular operation. 
  */
 template<typename T>
 class SessionRouterOperation : public WorkQueue::Work
 {
 public:
+    /**
+     * Constructor. 
+     *  @param amdCallback The callback object to provide results to the initiator of this operation.
+     *  @param context The SessionContext provides references to key objects needed by each operation. 
+     *  @param manager 
+     */
     SessionRouterOperation(const T& amdCallback,
                            const SessionContext& context,
                            OperationsManager* manager,
@@ -455,11 +465,18 @@ public:
     {
     }
 
+    /** 
+     * An implementation of the WorkQueue::Work interface. 
+     */
     virtual void doWork()
     {
         mCurrentStateHandler();
     }
 
+    /**
+     * Inform initiator that this operation finished with 
+     * the specified exception. 
+     */
     void finishWithException(const ::std::exception& e)
     {
         // Forward to this operation's initiator.
@@ -467,47 +484,72 @@ public:
         finish();
     }
 
+    /**
+     * Inform initiator that this operation finished with 
+     * an unspecified exception. 
+     */
     void finishWithException()
     {
         mInitiatorCallback->ice_exception();
         finish();
     }
 
+    /**
+     * Inform initiator that this operation finished with 
+     * an unspecified exception. 
+     */
     void finishAndSendResult()
     {
         mInitiatorCallback->ice_response();
         finish();
     }
     
+    /**
+     * This operation is called via a LookupCallback as a result of AMI calls 
+     * to the SessionManagers' EndpointLocators. 
+     */
     void setLookupResult(const EndpointSeq& endpoints)
     {
         mLookupResult = endpoints;
 
-        // Reschedule the operation to complete. The operation will have 
-        // advanced to the correct state handler. 
-        mSessionContext.workQueue->enqueue(mOperationsManager->getOngoingOperationSharedPointer(this));
+        // Reschedule this operation to complete.
+        try
+        {
+            mOperationsManager->reschedule(this);
+        }
+        catch(const Ice::Exception& e)
+        {
+            finishWithException(e);
+        }
     }
 
 protected: // These protected operations are utiltity functions. 
 
+    /**
+     * Common completion code. 
+     */
     void finish()
     {
+        // Mark internal state as finished. 
         mFinished = true;
+
+        // Inform our container that we are complete. 
         mOperationsManager->finished(this);
     }
 
     /**
      * Initiate a lookup of the requested endpoint.
+     *  @param destination Destination to be looked up.
+     *  @param current The Ice::Current reference. 
      */
     void lookupEndpoints(const std::string& destination, const ::Ice::Current current)
     {
         try
         {
             // This component's own lookup interface is implemented as AMD. 
-            // We provide our override of AMD callback. 
+            // We provide our override of the appropriate AMD callback. 
             AMD_EndpointLocator_lookupPtr lookupCallback;
 
-            boost::shared_ptr<WorkQueue::Work> workPtr = mOperationsManager->getOngoingOperationSharedPointer(this);
             lookupCallback = new LookupCallback<T>(this);
 
             // Lookup the destination.
@@ -519,7 +561,6 @@ protected: // These protected operations are utiltity functions.
         }
     }
 
-
     /**
      * Forward the start() operation to all sessions in a given sequence. 
      */
@@ -701,7 +742,7 @@ protected:
 
     bool mFinished;
     EndpointSeq mLookupResult;
-    SessionListenerAllocatorPtr mListenerManager;
+    SessionListenerManagerPtr mListenerManager;
     OperationsManager* mOperationsManager;
 
 private:
@@ -710,8 +751,12 @@ private:
 }; // class SessionRouterOperation
 
 /**
- * This is a specialization of the SessionRouterOperation to handle the
- * routeSession() operation. This object is an instance of WorkQueue::Work so that
+ * This is a specialization of the SessionRouterOperation<T> to handle the
+ * routeSession() operation. The template parameter T is the type of the routeSession()
+ * AMD callback handler to allow this object to send results to the initiator of this
+ * operation. 
+ * 
+ * This object is an instance of WorkQueue::Work so that
  * it can be enqueued to a worker thread. 
  */
 class  RouteSessionOperation : public SessionRouterOperation<AMD_SessionRouter_routeSessionPtr>
@@ -741,9 +786,10 @@ public:
 
 private:
     /**
-     * We start routing the session by looking up endpoint of the destination.
-     * This method represents processing in our initial state. 
-     * It is executed off the worker thread. 
+     * We start routing the session by looking up the endpoint of the destination. 
+     *
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
      */
     void lookupState()
     {
@@ -761,15 +807,13 @@ private:
         // Create a listener for the source to handle early termination.
         // The wrapper we're using will remove the listener and free it when
         // this method is left.
-        SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSource));
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSource));
         mListenerManager = listener;
 
-        // Set the state to exectute once we've looked up our endpoints. 
+        // Set the state handler to exectute once we've looked up our endpoints. 
         setState(boost::bind(&RouteSessionOperation::establishBridgeState, this), "establishBridgeState");
 
-        // mCurrentStateHandler = boost::bind(&RouteSessionOperation::establishBridgeState, this);
-
-        // Lookup the destination. This will use AMI, and the callback should 
+        // Lookup the destination. This will use AMI, and the callback will 
         // schedule us to execute again. 
         lg(Debug) << "routeSession(): Routing destination " << mDestination;
         lookupEndpoints(mDestination, mIceCurrent);
@@ -779,7 +823,8 @@ private:
      * Entering this state, the destination endpoint has been obtained. This state
      * completes the operation by creating the bridge. 
      * 
-     * This operation is the final state, and is executed off the worker thread. 
+     * This method is called via mCurrentStateHandler when doWork() is executed from the 
+     * WorkQueue. 
      */
     void establishBridgeState()
     {
@@ -788,7 +833,12 @@ private:
             return;
         }
 
-        assert(mLookupResult.size() > 0);
+        assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+        if (mLookupResult.size() < 1)
+        {
+            finishWithException(DestinationNotFoundException(mDestination));
+            return;
+        }
 
         // Add a session to the endpoints.
         SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
@@ -848,12 +898,13 @@ private:
 }; // class RouteSessionOperation
 
 /**
- * This operation replaces one session in a Bridge with a new session routable 
- * by the destination param.
+ * This operation replaces one session in a Bridge with a new session routable by the 
+ * destination param. This is a specialization of  SessionRouterOperation<T> that handles the
+ * connectBridgedSessionsWithDestination() operation. The template parameter T is the type 
+ * of the connectBridgedSessionsWithDestination() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
  * 
- * This is a specialization of the SessionRouterOperation that handles the
- * connectBridgedSessionsWithDestination() operation. This object is an instance 
- * of WorkQueue::Work so that it can enqueued to a worker thread. 
+ * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
  */
  class  ConnectBridgedSessionsWithDestinationOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsWithDestinationPtr>
 {
@@ -894,7 +945,7 @@ private:
         // The wrapper we're using will remove the listener and free it when
         // this method is left.
         lg(Debug) << "connectBridgedSessionsWithDestination(): Attaching listener";
-        SessionListenerAllocatorPtr listener(new SessionListenerAllocator(mSessionContext.adapter, mSessionToReplace));
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, mSessionToReplace));
         mListenerManager = listener;
 
         // Route the destination
@@ -922,6 +973,13 @@ private:
             return;
         }
 
+        assert(mLookupResult.size() > 0); // This exception should have been handled in EndpointRegistry if lookup failed. 
+        if (mLookupResult.size() < 1)
+        {
+            finishWithException(DestinationNotFoundException(mDestination));
+            return;
+        }
+
         // Add a session 
         SessionSeq newSessions = createSessionForEndpoints(mLookupResult, mDestination);
 
@@ -977,10 +1035,13 @@ private:
 }; // class ConnectBridgedSessionsWithDestinationOperation
 
 /**
- * This is a specialization of the SessionRouterOperation that handles the
- * connectBridgedSessions() operation. 
  * Replace one session in a Bridge with sessions from another bridge.
- * No routing is actually performed. This operation exists here for consistency,
+ * No routing is actually performed. This operation exists here for consistency.
+ * This is a specialization of SessionRouterOperation<T> that handles the
+ * connectBridgedSessions() operation. The template parameter T is the type of 
+ * the connectBridgedSessions() AMD callback handler to allow this object to send results to 
+ * the initiator of this operation. 
+ *
  * This object is an instance of WorkQueue::Work so that it can enqueued to a worker thread. 
  */
 class  ConnectBridgedSessionsOperation : public SessionRouterOperation<AMD_SessionRouter_connectBridgedSessionsPtr>
@@ -1008,7 +1069,6 @@ public:
 private:
     /**
      * Replace one session in a Bridge with sessions from another bridge.
-     * No routing is actually performed. This operation exists here for consistency,
      */
     void connectBridgedSessionsState()
     {
@@ -1020,10 +1080,9 @@ private:
         SessionSeq preserveSessions = getSessionsInBridge(mergeBridge, mSessionToReplace);
 
         // Create a listener for the sessions not being replaced to handle early termination.
-        // The wrapper we're using will remove the listener and free it when
-        // this method is left.
         lg(Debug) << "connectBridgedSessions(): Adding listener to " << preserveSessions.size() << " session(s)." ;
-        SessionListenerAllocator listener(mSessionContext.adapter, preserveSessions);
+        SessionListenerManagerPtr listener(new SessionListenerManager(mSessionContext.adapter, preserveSessions));
+        mListenerManager = listener;
 
         // Get the bridge for the sessions being moved.
         BridgePrx oldBridge = getBridge(mBridgedSession);
@@ -1031,7 +1090,7 @@ private:
         SessionSeq migratingSessions = removeSessionsFromBridge(oldBridge, mBridgedSession);
 
         // Check for early termination by the source.
-        if (listener->isTerminated())
+        if (mListenerManager->getListener()->isTerminated())
         {
             lg(Notice) << "connectBridgedSessions(): Source ended session before transfer in connectBridgedSessions(). " ;
             finishWithException(SourceTerminatedPreBridgingException(preserveSessions[0]->getEndpoint()->getId()));
@@ -1041,7 +1100,7 @@ private:
         // We're through listening, and we will probably interfere with the Bridge's functionality if
         // we keep listening.
         lg(Debug) << "connectBridgedSessions(): Removing listener. " ;
-        listener->unregister();
+        mListenerManager->getListener()->unregister();
 
         // Now replace the sessions.
         try
@@ -1073,7 +1132,7 @@ private:
 /** 
  * An implementation of the AMD_EndpointLocator_lookup callback so 
  * that we can call our own lookup operation. 
- * Note that we're not really using AMD, but we're using the same
+ * Note that we're not really dispatching via AMD, but we're using the same
  * AMD implementation that other components would use to do a lookup(). 
  */
 template <typename T>
@@ -1143,9 +1202,15 @@ public:
         mSessionContext.bridgeManager = bridgeAccessor;
     }
 
+    /**
+     * Enqueue the work to the WorkQueue. 
+     */
     void scheduleOperation(const boost::shared_ptr<WorkQueue::Work>& work)
     {
+        // Maintain refs to all ongoing operations. 
         mOngoingOperations[work.get()] = work;
+
+        // Enqueue work. 
         mSessionContext.workQueue->enqueue(work);
     }
 
@@ -1167,16 +1232,31 @@ public: // Overrides
         }
     }
 
+    /**
+     * Handle a an operation's need to reschedule itself.  
+     * The operation doesn't have a shared_ptr to itself, so
+     * it can't do it internally. 
+     */
+    virtual void reschedule(WorkQueue::Work *op)
+    {
+        mSessionContext.workQueue->enqueue(getOngoingOperationSharedPointer(op));
+    }
+
+private:
     /** 
-     * The operations sometimes need a shared_ptr to themselves to hand off to callbacks or
-     * other objects being processed in other threads. 
+     * Find our shared_ptr for a given Work object raw pointer. 
      */
-    virtual boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
+    boost::shared_ptr<WorkQueue::Work> getOngoingOperationSharedPointer(WorkQueue::Work* operation)
     {
         boost::lock_guard<boost::mutex> guard(mLock);
         OperationMap::iterator kvp = mOngoingOperations.find(operation);
 
         assert(kvp != mOngoingOperations.end());
+        if (kvp == mOngoingOperations.end())
+        {
+            throw Ice::UnknownException("SessionRouterPriv: Failed finding shared_ptr for SessionRouter operation.", 1);
+        }
+
         return (*kvp).second;
     }
 
@@ -1186,6 +1266,9 @@ public:
     boost::mutex mLock;
 };
 
+/**
+ * The SessionRouter implementation. 
+ */
 SessionRouter::SessionRouter(
                   const Ice::ObjectAdapterPtr& objectAdapter, const EndpointRegistryPtr& endpointRegistry,
                   const AsteriskSCF::Core::Routing::V1::Event::RoutingEventsPtr& eventPublisher,
@@ -1199,6 +1282,10 @@ SessionRouter::~SessionRouter()
     mImpl.reset();
 }
 
+/** 
+ * The BridgeManager proxy can only be obtained once our object adapter is activated, so our
+ * bridgeManager reference's initialization is deferred. 
+ */
 void SessionRouter::setBridgeManager(
     const AsteriskSCF::SmartProxy::SmartProxy<
         SessionCommunications::V1::BridgeManagerPrx>& bridgeAccessor)
diff --git a/src/SimpleWorkQueue.cpp b/src/SimpleWorkQueue.cpp
index f55ff82..48cabc2 100644
--- a/src/SimpleWorkQueue.cpp
+++ b/src/SimpleWorkQueue.cpp
@@ -101,8 +101,8 @@ void SimpleWorkQueue::pause()
 {
 	mImpl->mLogger(Info) << "SimpleWorkQueue::Pause called for queue " << mImpl->mQid;
 
-   boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
-   mImpl->mPaused = true;
+    boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
+    mImpl->mPaused = true;
 }
 
 /**
@@ -112,7 +112,7 @@ void SimpleWorkQueue::resume()
 {
    mImpl->mLogger(Info) << "SimpleWorkQueue::Resume called for queue " << mImpl->mQid;
 
-   boost::mutex::scoped_lock lock(mImpl->mPauseMutex);
+   boost::lock_guard<boost::mutex> lock(mImpl->mPauseMutex);
    mImpl->mPaused = false;
    mImpl->mPauseCondition.notify_all();
 }
@@ -153,13 +153,16 @@ static WorkQueue::PoolId mNoOpPoolId;
 /**
  * Enqueue an item of work for processing on this queue's thread. 
  */
-WorkQueue::PoolId SimpleWorkQueue::enqueue(WorkPtr w)
+WorkQueue::PoolId SimpleWorkQueue::enqueue(const WorkPtr& w)
 {
-   boost::mutex::scoped_lock lock(mImpl->mQueueMutex);
-   bool wasEmpty = mImpl->mQueue.empty();
-   mImpl->mQueue.push_back(w);
-   int size = mImpl->mQueue.size();
-   lock.unlock();
+    bool wasEmpty(false);
+
+    { // scope for the mutex.
+        boost::lock_guard<boost::mutex> lock(mImpl->mQueueMutex);
+        wasEmpty = mImpl->mQueue.empty();
+        mImpl->mQueue.push_back(w);
+        int size = mImpl->mQueue.size();
+    }
  
    if (wasEmpty)
    { 
@@ -187,7 +190,7 @@ static shared_ptr<WorkQueue::Work> NO_WORK_PTR(new NO_WORK_CLASS());
  */
 WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
 {
-    boost::mutex::scoped_lock lock(mQueueMutex);
+    boost::unique_lock<boost::mutex> lock(mQueueMutex);
 
     int size = mQueue.size(); // debugging
 
@@ -216,7 +219,7 @@ WorkPtr SimpleWorkQueuePriv::waitAndDequeue()
 
 bool SimpleWorkQueuePriv::isPaused()
 {
-    boost::mutex::scoped_lock lock(mPauseMutex);
+    boost::lock_guard<boost::mutex> lock(mPauseMutex);
     return mPaused;
 }
 
@@ -237,7 +240,7 @@ void SimpleWorkQueuePriv::execute()
    while (!mFinished)
    {
         { // scope the lock
-            boost::mutex::scoped_lock lock(mPauseMutex);
+            boost::unique_lock<boost::mutex> lock(mPauseMutex);
             while(mPaused)
             {
                 mLogger(Debug) << "SimpleWorkQueue::Execute: Waiting while paused. Queue ID:" << mQid;
diff --git a/src/SimpleWorkQueue.h b/src/SimpleWorkQueue.h
index 164b1a7..024492c 100644
--- a/src/SimpleWorkQueue.h
+++ b/src/SimpleWorkQueue.h
@@ -41,11 +41,11 @@ public:
     SimpleWorkQueue(const std::string& id, const AsteriskSCF::System::Logging::Logger& logger);
     ~SimpleWorkQueue();
 
-    virtual WorkQueue::PoolId enqueue(WorkPtr w);
+    virtual WorkQueue::PoolId enqueue(const WorkPtr& w);
     virtual void terminate();
     virtual void join();
 
-    // This implementation adds the concept of Pausing to the generic IWorkQueue. 
+    // This implementation adds the concept of Pausing to the generic WorkQueue. 
     bool isRunning();
     bool workPending();
     void pause();
diff --git a/src/WorkQueue.h b/src/WorkQueue.h
index c3a7a9a..4d969ad 100644
--- a/src/WorkQueue.h
+++ b/src/WorkQueue.h
@@ -51,7 +51,7 @@ public:
     /**
      * Enqueue work to be peformed. 
      */
-    virtual PoolId enqueue(WorkPtr w) = 0;
+    virtual PoolId enqueue(const WorkPtr& w) = 0;
 
     /**
      * Enqueue work to be performed, and specify a particular thread
@@ -59,7 +59,7 @@ public:
      * Note: Implementations of Thread Pools are expected to override this
      * default implementation. 
      */
-    virtual void enqueue(WorkPtr w, PoolId) 
+    virtual void enqueue(const WorkPtr& w, PoolId) 
     {
         enqueue(w);
     }

-----------------------------------------------------------------------


-- 
team/ken.hunt/route_async_routing.git



More information about the asterisk-scf-commits mailing list