[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Mar 18 13:07:45 CDT 2011


branch "async-bridging" has been updated
       via  9b6ca85f6ca32fb9125b985d4ae1d62d145cf100 (commit)
      from  a911c2e9da43c5b8ff6e54da7d91e452765a0c3a (commit)

Summary of changes:
 config/test_bridging.conf.in |    1 +
 src/BridgeImpl.cpp           |  637 +++++++++++++++++++++++++++---------------
 src/BridgeImpl.h             |   10 +-
 src/BridgeManagerImpl.cpp    |  102 ++++---
 src/MediaSplicer.cpp         |   55 +++-
 src/SessionWrapper.cpp       |   19 +-
 src/Tasks.h                  |  118 ++++----
 7 files changed, 596 insertions(+), 346 deletions(-)


- Log -----------------------------------------------------------------
commit 9b6ca85f6ca32fb9125b985d4ae1d62d145cf100
Author: Brent Eagles <beagles at digium.com>
Date:   Fri Mar 18 15:36:49 2011 -0230

    Cleanup and fix the executors and add some logging.
    Hook up AMD callbacks to task queues.
    Finish AMD'ifying BridgeManager and Bridge Methods.

diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 461f245..ec7ced3 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -5,6 +5,7 @@ AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
 AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
 AsteriskSCFIceStorm.Transient=1
 TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
+Ice.ThreadPool.Client.Size=4
 
 LoggerAdapter.Endpoints=default
 AsteriskSCF.LoggingService.Endpoints=default
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index d29fb29..9a4c051 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -77,7 +77,6 @@ public:
     //
     // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
     //
-    void addSessions(const SessionSeq& sessions);
     void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
     void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
     void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
@@ -90,7 +89,6 @@ public:
     void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
     void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
 
-    void replaceSession(const SessionPrx&, const SessionSeq& newSessions, const Ice::Current& current);
     void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac, const SessionPrx& sessionToReplace,
             const SessionSeq& newSessions, const Ice::Current& current);
 
@@ -110,7 +108,9 @@ public:
     string id();
     SessionCollectionPtr sessions();
 
-    void spawnShutdown();
+    void forceUpdate();
+
+    void getAddSessionsTasks(QueuedTasks& tasks, const SessionSeq& sessions);
 
 private:
 
@@ -153,11 +153,6 @@ private:
     //
     Logger mLogger;
 
-    //
-    // TODO: check to see if this is really needed. If so, it should really be done by a thread pool.
-    //
-    IceUtil::Handle<IceUtil::Thread> mShutdownThread;
-
     void statePreCheck();
     BridgeStateItemPtr createUpdate();
     void pushUpdate(const BridgeStateItemPtr& update);
@@ -196,6 +191,314 @@ static void checkSessions(const SessionSeq& sessions)
 //
 static const string TopicPrefix("AsteriskSCF.Bridge.");
 
+class ShutdownThread : public IceUtil::Thread
+{
+public:
+    ShutdownThread(const BridgePrx& bridge) :
+        mBridge(bridge)
+    {
+    }
+
+    void run()
+    {
+        try
+        {
+            mBridge->shutdown();
+        }
+        catch (...)
+        {
+        }
+    }
+private:
+    BridgePrx mBridge;
+};
+
+class SessionsTracker : public IceUtil::Shared
+{
+public:
+    SessionsTracker() :
+        mResponses(0)
+    {
+    }
+    
+    void add(const SessionPrx& s)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSessions.push_back(s);
+        ++mResponses;
+    }
+ 
+    SessionSeq getSessions()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mSessions;
+    }
+
+    void addException(const SessionPrx& session, const Ice::Exception& ex)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        SessionError error;
+        error.failedSession = session;
+        error.message = ex.what();
+        mExceptions.push_back(error);
+        ++mResponses;
+    }
+
+    void addExceptionMessage(const SessionPrx& session, const string& msg)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        SessionError error;
+        error.failedSession = session;
+        error.message = msg;
+        mExceptions.push_back(error);
+        ++mResponses;
+    }
+
+    SessionErrorSeq getExceptions()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mExceptions;
+    }
+
+    size_t responseCount()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mResponses;
+    }
+
+private:
+    boost::shared_mutex mLock;
+    SessionSeq mSessions;
+    SessionErrorSeq mExceptions;
+    size_t mResponses;
+};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
+
+class RemoveSessionsNotify : public QueuedTask
+{
+public:
+    RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+            const SessionsTrackerPtr& tracker) :
+        QueuedTask("RemoveSessionsNotify"),
+        mBridgeListeners(bridgeListeners),
+        mTracker(tracker)
+    {
+    }
+    
+protected:
+    bool executeImpl()
+    {
+        SessionSeq sessions = mTracker->getSessions();
+        if (!sessions.empty())
+        {
+            mBridgeListeners->sessionsRemoved(sessions);
+        }
+        return true;
+    }
+           
+private:
+    BridgeListenerMgrPtr mBridgeListeners;
+    SessionsTrackerPtr mTracker;
+};
+
+class SetBridgeTask : public QueuedTask
+{
+public:
+    SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+            const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+        QueuedTask("SetBridgeTask"),
+        mSessionManager(sessionCollection),
+        mBridge(bridge),
+        mSessionListener(listener),
+        mSessions(sessions),
+        mTracker(tracker)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        bool tasksDispatched = false;
+        for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        {
+            SessionWrapperPtr session = mSessionManager->addSession(*i);
+            if (session == 0)
+            {
+                //
+                // This shouldn't happen!
+                //
+                mTracker->addExceptionMessage(*i, "session already added");
+                continue;
+            }
+            tasksDispatched = true;
+            (*i)->begin_setBridge(mBridge, mSessionListener,
+                    newCallback_Session_setBridge(this, &SetBridgeTask::set,
+                            &SetBridgeTask::failed), session);
+        }
+        return !tasksDispatched;
+    }
+
+    void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
+    {
+        mTracker->add(session->getSession());
+        if (info->currentState != "ready")
+        {
+            //
+            // setupMedia is an AMI backed implementation, so should not block here.
+            //
+            session->setupMedia();
+        }
+        if (mTracker->responseCount() == mSessions.size())
+        {
+            mListener->succeeded();
+        }
+    }
+
+    void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+    {
+        //
+        // TODO:
+        // * Log exception.
+        // * Rollback
+        // * Interpret exception and decide whether or not to fail the entire operations.
+        // Currently the semantics allow some operations to fail.
+        //
+        mTracker->addException(session->getSession(), ex);
+        try
+        {
+            //
+            // We want to make sure that session is laying around. The session collection will
+            // take care of cleaning it up as long as it is marked as destroyed.
+            //
+            session->destroy();
+        }
+        catch(...)
+        {
+        }
+        if (mTracker->responseCount() == mSessions.size())
+        {
+            SessionErrorSeq exceptions = mTracker->getExceptions();
+            if (exceptions.size() == mSessions.size())
+            {
+                mListener->failed();
+            }
+            else
+            {
+                mListener->succeeded();
+            }
+        }
+    }
+    
+private:
+    SessionCollectionPtr mSessionManager;
+    BridgePrx mBridge;
+    SessionListenerPrx mSessionListener;
+    SessionSeq mSessions;
+    SessionsTrackerPtr mTracker;
+};
+
+class AddToListeners : public QueuedTask
+{
+public:
+    AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+        QueuedTask("AddToListeners"),
+        mListeners(listeners),
+        mTracker(tracker)
+    {
+    }
+    
+protected:
+    bool executeImpl()
+    {
+        mListeners->sessionsAdded(mTracker->getSessions());
+        return true;
+    }
+    
+private:
+    BridgeListenerMgrPtr mListeners;
+    SessionsTrackerPtr mTracker;
+};
+
+class CheckShutdown : public QueuedTask
+{
+public:
+    CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+        QueuedTask("CheckShutdown"),
+        mBridge(bridge),
+        mPrx(proxy)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        if (mBridge->sessions()->size() < 2 && mPrx)
+        {
+            IceUtil::Handle<IceUtil::Thread> t = new ShutdownThread(mPrx);
+            t->start();
+        }
+        return true;
+    }
+private:
+    BridgeImplPtr mBridge;
+    BridgePrx mPrx;
+};
+
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+        QueuedTask("GenericAMDCallback"),
+        mCallback(cb),
+        mTracker(tracker)
+    {
+    }
+protected:
+
+    bool executeImpl()
+    {
+        mCallback->ice_response();
+        return true;
+    }
+
+    void failImpl()
+    {
+        SessionErrorSeq errors(mTracker->getExceptions());
+        if (!errors.empty())
+        {
+            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+        }
+        else
+        {
+            mCallback->ice_exception();
+        }
+    }
+
+private:
+    T mCallback;
+    SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+    UpdateTask(const BridgeImplPtr& bridge) :
+        QueuedTask("UpdateTask"),
+        mBridge(bridge)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mBridge->forceUpdate();
+        return true;
+    }
+private:
+    BridgeImplPtr mBridge;
+};
+
 } // End of anonymous namespace
 
 BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
@@ -231,91 +534,33 @@ BridgeImpl::~BridgeImpl()
     //
 }
 
-void BridgeImpl::addSessions(const SessionSeq& sessions)
-{
-    if (sessions.size() == 0)
-    {
-        return;
-    }
-    checkSessions(sessions);
-    statePreCheck();
-    mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
-
-    SessionSeq addedSessions;
-    for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
-    {
-        try
-        {
-            SessionWrapperPtr session = mSessions->addSession(*i);
-            if (session)
-            {
-                SessionInfoPtr info;
-                try
-                {
-                    RetryPolicy policy(5, 500);
-                    //
-                    // canRetry should never return false since we throw ourselves out of this loop. But
-                    // we'll do it here in case we decide to do something else.
-                    //
-                    while (policy.canRetry())
-                    {
-                        try
-                        {
-                            //
-                            // TODO : AMI!
-                            //
-                            info = (*i)->setBridge(mPrx, mSessionListenerPrx);
-                            addedSessions.push_back(*i);
-                            break;
-                        }
-                        catch (const Ice::ConnectionLostException&)
-                        {
-                            if (!policy.retry())
-                            {
-                                throw;
-                            }
-                        }
-                    }
-                }
-                catch (const Ice::Exception& ex)
-                {
-                    mLogger(Debug) << FUNLOG << ": " << (*i)->ice_toString() << " threw " << ex.what() 
-                                   << " continuing";
-                }
-                if (info->currentState != "ready")
-                {
-                    //
-                    // We setup media.
-                    // TODO: AMI should come into play here.
-                    //
-                    session->setupMedia();
-                }
-            }
-        }
-        catch (const Ice::Exception&)
-        {
-        }
-    }
-
-    BridgeStateItemPtr update;
-    if (addedSessions.size())
-    {
-        mListeners->sessionsAdded(addedSessions);
-        {
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            update = createUpdate();
-        }
-    }
-    pushUpdate(update);
-}
-
 void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions,
         const Ice::Current& current)
 {
     try
     {
-        addSessions(sessions);
-        callback->ice_response();
+        if (sessions.size() == 0 && callback)
+        {
+            callback->ice_response();
+            return;
+        }
+        checkSessions(sessions);
+        statePreCheck();
+        mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
+
+        SessionsTrackerPtr tracker(new SessionsTracker);
+        QueuedTasks tasks;
+        tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+        tasks.push_back(new AddToListeners(mListeners, tracker));
+        tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+        tasks.push_back(new UpdateTask(this));
+        ExecutorPtr executor(new Executor(tasks, mLogger));
+        executor->start();
+        //
+        // When the operations have all completed, that last task will take care of handling
+        // the callback. It's all left withing the try/catch in the event something happens during
+        // task startup.
+        //
     }
     catch (const std::exception& ex)
     {
@@ -327,45 +572,42 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
     }
 }
 
-void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current& current)
-{
-    if (sessions.size() == 0)
-    {
-        return;
-    }
-    checkSessions(sessions);
-    SessionSeq removedSessions;
-    statePreCheck();
-    for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
-    {
-        SessionWrapperPtr session = mSessions->getSession(*i);
-        if (session)
-        {
-            session->shutdown(mSessionListenerPrx, new ResponseCode);
-            removedSessions.push_back(*i);
-        }
-    }
-
-    if (removedSessions.size() != 0)
-    {
-        mListeners->sessionsRemoved(removedSessions);
-    }
-
-    SessionSeq sessionsLeft(mSessions->getSessionSeq());
-    if (sessionsLeft.size() < 2)
-    {
-        spawnShutdown();
-    }
-    mSessions->reap();
-}
-
 void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
         const Ice::Current& current)
 {
     try
     {
-        removeSessions(sessions, current);
-        callback->ice_response();
+        if (sessions.size() == 0)
+        {
+            callback->ice_response();
+            return;
+        }
+        checkSessions(sessions);
+        statePreCheck();
+
+        //
+        // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+        // we allow them to proceed asynchronously and do not concern ourselves with the result.
+        // The logic of shutdown should remove them either because the operations succeeded or
+        // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+        // is pointless.
+        //
+        SessionsTrackerPtr removed(new SessionsTracker);
+        for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
+        {
+            SessionWrapperPtr session = mSessions->getSession(*i);
+            if (session)
+            {
+                session->shutdown(mSessionListenerPrx, new ResponseCode);
+                removed->add(session->getSession());
+            }
+        }
+        QueuedTasks tasks;
+        tasks.push_back(new RemoveSessionsNotify(mListeners, removed));
+        tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
+        tasks.push_back(new CheckShutdown(this, mPrx));
+        ExecutorPtr runner(new Executor(tasks, mLogger));
+        runner->start();
     }
     catch (const std::exception& ex)
     {
@@ -519,90 +761,40 @@ void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Cu
     }
 }
 
-void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
-{
-    mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
-
-    checkSessions(newSessions);
-    statePreCheck();
-    
-    SessionWrapperPtr session = mSessions->getSession(oldSession);
-    //
-    // If the session did not exist on this bridge, then this operation should not proceed.
-    //
-    if (!session)
-    {
-        throw SessionNotFound(oldSession);
-    }
-    session->shutdown(mSessionListenerPrx, new ResponseCode);
-
-    SessionSeq removed;
-    removed.push_back(oldSession);
-    mListeners->sessionsRemoved(removed);
-
-    SessionSeq added;
-    for (SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
-    {
-        try
-        {
-            RetryPolicy policy(5, 500);
-            //
-            // canRetry should never return false since we throw ourselves out of this loop. But
-            // we'll do it here in case we decide to do something else.
-            //
-            while (policy.canRetry())
-            {
-                try
-                {
-                    SessionInfoPtr info = (*i)->setBridge(mPrx, mSessionListenerPrx);
-                    SessionWrapperPtr session = mSessions->addSession(*i);
-                    if (info->currentState != "ready")
-                    {
-                        //
-                        // We setup media.
-                        // TODO: AMI should come into play here.
-                        //
-                        session->setupMedia();
-                    }
-                    added.push_back(*i);
-                    
-                    break;
-                }
-                catch (const Ice::ConnectionLostException&)
-                {
-                    if (!policy.retry())
-                    {
-                        throw;
-                    }
-                }
-            }
-        }
-        catch (const Ice::Exception& ex)
-        {
-            //
-            // We need to continue if setBridge fails for some reason. Rolling
-            // back the other sessions would be difficult. The bridge does not
-            // know enough to try for system wide consistency. An OTS would
-            // really be required if things like replaceSessions() were to be
-            // atomic.
-            //
-            mLogger(Info) << FUNLOG << ": setting the bridge on " << *i << " threw " << ex.what();
-        }
-    }
-
-    //
-    // Now update the listeners.
-    //
-    mListeners->sessionsAdded(added);
-}
-
 void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callback, const SessionPrx& sessionToReplace,
         const SessionSeq& newSessions, const Ice::Current& current)
 {
     try
     {
-        replaceSession(sessionToReplace, newSessions, current);
-        callback->ice_response();
+        mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+
+        checkSessions(newSessions);
+        statePreCheck();
+        
+        SessionWrapperPtr session = mSessions->getSession(sessionToReplace);
+        //
+        // If the session did not exist on this bridge, then this operation should not proceed.
+        //
+        if (!session)
+        {
+            throw SessionNotFound(sessionToReplace);
+        }
+        //
+        // Shutdown is inheritently asynchronous (see SessionWrapper::shutdown())
+        //
+        SessionsTrackerPtr removeTracker(new SessionsTracker);
+        removeTracker->add(session->getSession());
+        session->shutdown(mSessionListenerPrx, new ResponseCode);
+        
+        SessionsTrackerPtr tracker(new SessionsTracker);
+        QueuedTasks tasks;
+        tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker));
+        tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
+        tasks.push_back(new AddToListeners(mListeners, tracker));
+        tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
+        tasks.push_back(new UpdateTask(this));
+        ExecutorPtr executor(new Executor(tasks, mLogger));
+        executor->start();
     }
     catch (const std::exception& ex)
     {
@@ -725,39 +917,23 @@ SessionCollectionPtr BridgeImpl::sessions()
     return mSessions;
 }
 
-namespace
+void BridgeImpl::forceUpdate()
 {
-class ShutdownThread : public IceUtil::Thread
-{
-public:
-    ShutdownThread(const BridgePrx& bridge) :
-        mBridge(bridge)
-    {
-    }
-
-    void run()
+    BridgeStateItemPtr update;
     {
-        try
-        {
-            mBridge->shutdown();
-        }
-        catch (...)
-        {
-        }
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        update = createUpdate();
     }
-private:
-    BridgePrx mBridge;
-};
+    pushUpdate(update);
 }
 
-void BridgeImpl::spawnShutdown()
+void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
+        const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
 {
-    boost::shared_lock<boost::shared_mutex> lock(mLock);
-    if (!mShutdownThread)
-    {
-        mShutdownThread = new ShutdownThread(mPrx);
-        mShutdownThread->start();
-    }
+    SessionsTrackerPtr tracker(new SessionsTracker);
+    tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+    tasks.push_back(new AddToListeners(mListeners, tracker));
+    tasks.push_back(new UpdateTask(this));
 }
 
 void BridgeImpl::statePreCheck()
@@ -805,7 +981,7 @@ void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
 
 void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
 {
-    if (update.size() > 0 && replicate())
+    if (!update.empty() && replicate())
     {
         mReplicator->setState(update);
     }
@@ -848,7 +1024,8 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
         const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
 {
     logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
-    IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
+    IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(
+        new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
                     listenerMgr, replicator, state, logger));
     
     return bridge;
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 1276482..fd5dbfa 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -96,8 +96,6 @@ public:
      **/
     virtual void activate(const AsteriskSCF::SessionCommunications::V1::BridgePrx& proxy) = 0;
 
-    virtual void addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions) = 0;
-
     /**
      *
      * Replication helper methods
@@ -113,6 +111,14 @@ public:
 
     virtual SessionCollectionPtr sessions() = 0;
 
+    //
+    // Allows an external agent to force a state update replication.
+    //
+    virtual void forceUpdate() = 0;
+
+    virtual void getAddSessionsTasks(QueuedTasks& tasks,
+            const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions) = 0;
+
     /**
      *
      * Internal implementation detail interface.
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 64b13ad..7d90ed5 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -60,8 +60,6 @@ public:
     //
     // AsteriskSCF::SessionCommunications::V1::BridgeManager Interface
     //
-    BridgePrx createBridge(const SessionSeq& endpoints, const BridgeListenerPrx& listener, const Ice::Current& current);
-
     void createBridge_async(const AMD_BridgeManager_createBridgePtr& request, const SessionSeq& endpoints,
             const BridgeListenerPrx& listener, const Ice::Current&);
 
@@ -172,59 +170,81 @@ BridgeManagerImpl::~BridgeManagerImpl()
     mListeners = 0;
 }
 
-BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
-  const BridgeListenerPrx& listener, const Ice::Current& current)
+class FinishUp : public QueuedTask
 {
-    mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    statePreCheck(BOOST_CURRENT_FUNCTION);
-    reap();
-
-    string stringId = string("bridge.") + IceUtil::generateUUID();
-    Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
-    BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
-    BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
-    vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
-    if (listener)
+public:
+    FinishUp(const AMD_BridgeManager_createBridgePtr& callback,
+            const BridgeManagerListenerMgrPtr& listenerMgr,
+            const BridgePrx& bridgeProxy) :
+        QueuedTask("FinishUp"),
+        mCallback(callback),
+        mListenerMgr(listenerMgr),
+        mBridgeProxy(bridgeProxy)
     {
-        listeners.push_back(listener);
     }
+    
+protected:
 
-    BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
-    Ice::ObjectPrx obj = mAdapter->add(bridge, id);
-
-    mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
-    BridgeInfo info;
-    info.servant = bridge;
-    info.proxy = BridgePrx::uncheckedCast(obj);
-
-    bridge->activate(info.proxy);
-
-    if (mListeners)
+    bool executeImpl()
     {
-        mListeners->bridgeCreated(info.proxy);
+        if (mListenerMgr)
+        {
+            mListenerMgr->bridgeCreated(mBridgeProxy);
+        }
+        mCallback->ice_response(mBridgeProxy);
+        return true;
     }
-    else
+
+    void failImpl()
     {
-        mLogger(Debug) << ": bridgeCreated event not published as there are no listeners configured.";
+        mCallback->ice_exception();
     }
-    mBridges.push_back(info);
-
-    //
-    // Don't forget to add the initial sessions.
-    //
-    bridge->addSessions(sessions);
 
-    return info.proxy;
-}
+private:
+    AMD_BridgeManager_createBridgePtr mCallback;
+    BridgeManagerListenerMgrPtr mListenerMgr;
+    BridgePrx mBridgeProxy;
+};
 
 void BridgeManagerImpl::createBridge_async(const AMD_BridgeManager_createBridgePtr& callback,
-        const SessionSeq& endpoints, const BridgeListenerPrx& listener, const Ice::Current& current)
+        const SessionSeq& sessions, const BridgeListenerPrx& listener, const Ice::Current& current)
 {
     try
     {
-        BridgePrx result = createBridge(endpoints, listener, current);
-        callback->ice_response(result);
+        mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        statePreCheck(BOOST_CURRENT_FUNCTION);
+        reap();
+
+        string stringId = string("bridge.") + IceUtil::generateUUID();
+        Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
+        BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+        BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
+        vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
+        if (listener)
+        {
+            listeners.push_back(listener);
+        }
+
+        BridgeServantPtr bridge = BridgeServant::create(stringId, mAdapter, listeners, mgr, mReplicator, mLogger);
+        Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+
+        mLogger(Info) << objectIdFromCurrent(current) << ": creating new bridge " << obj->ice_toString() << "." ;
+        BridgeInfo info;
+        info.servant = bridge;
+        info.proxy = BridgePrx::uncheckedCast(obj);
+        bridge->activate(info.proxy);
+        mBridges.push_back(info);
+
+        QueuedTasks tasks;
+        if (!sessions.empty())
+        {
+            bridge->getAddSessionsTasks(tasks, sessions);
+        }
+        
+        tasks.push_back(new FinishUp(callback, mListeners, info.proxy));
+        ExecutorPtr runner(new Executor(tasks, mLogger));
+        runner->start();
     }
     catch (const std::exception& ex)
     {
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 7fab2b1..619d5c7 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -23,6 +23,13 @@
 #include "ServiceUtil.h"
 #include "SessionWrapper.h"
 
+//
+// TODO:
+// * Reasonable retry semantics.
+// * Rollback on connection failures.
+// * Cleanup of MediaConnectorI.
+// * Conference bridging.
+//
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Bridge::V1;
@@ -36,21 +43,25 @@ namespace BridgeService
 // Forward declarations.
 //
 class MediaSplicerI;
-class MedixMixerPtr;
+class MediaConnectorI;
+typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
 
+//
+// Some types that are used throughout.
+//
 typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
 typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
 typedef vector<OutgoingPairing> OutgoingPairings;
 typedef vector<IncomingPairing> IncomingPairings;
 
-class MediaConnectorI;
-typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
-class MediaSplicerI;
-typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
-
+//
+// This structure acts as a sort of token for list of media connection operations. These operations are performed
+// asynchronously and helper classes to move through each operation, effectively serializing the entire affair while
+// letting independent subtasks to be performed independently. 
+// 
 struct MediaConnectorBuilder : public IceUtil::Shared
 {
-public:
     AsteriskSCF::Media::V1::SessionPrx mediaSession;
     AsteriskSCF::Media::V1::StreamSourceSeq sources;
     AsteriskSCF::Media::V1::StreamSinkSeq sinks;
@@ -61,10 +72,10 @@ public:
 typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
 
 //
-// TODO: These proxies could use some retry properties added to them...
-// particularily for the media stream connect/disconnect operations.
+// An implementation of the MediaConnector interface. The details of "hookup up" media is all hidden behind this
+// interface. It's undergone several revisions in the past, some pretty hasty so it probably warrants a refactoring and
+// a close look on its behavior.
 //
-
 class MediaConnectorI : public MediaConnector
 {
 public:
@@ -413,7 +424,8 @@ private:
     Logger mLogger;
 };
 
-QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
+        const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
 //
 // TODO: This needs to register the streams with an active threaded mixing element.
 //
@@ -428,6 +440,10 @@ public:
     {
     }
 
+    //
+    // This operation is called internally when the operations for initializing a MediaConnectorBuilder instance have
+    // completed.
+    //
     MediaConnectorPtr createConnector(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& data)
     {
         MediaConnectorIPtr connector(new MediaConnectorI(data, mBridgeId, session->id(), mReplicator, mLogger));
@@ -440,6 +456,10 @@ public:
         return connector;
     }
 
+    //
+    // Called through the external MediaSplicer interface, this initiates a series of operations to establish the media
+    // interconnections so a session can communicate on the bridge.
+    //
     void connect(const SessionWrapperPtr& session)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -469,7 +489,12 @@ public:
             existing = mSessions.back().connector;
             existing->clearConnections();
         }
-        ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this)));
+
+        //
+        // We do not bother tracking the executor for now. A tidy shutdown would probably want to clean this up.
+        // An alternative is to pass back the queued tasks to the caller and let them start and stop the process.
+        //
+        ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this), mLogger));
         taskExecutor->start();
     }
 
@@ -667,6 +692,7 @@ class GetMediaSession : public QueuedTask
 {
 public:
     GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetMediaSession"),
         mSession(session),
         mMaterials(materials)
     {
@@ -700,6 +726,7 @@ class GetSinks : public QueuedTask
 {
 public:
     GetSinks(const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetSinks"),
         mMaterials(materials)
     {
     }
@@ -730,6 +757,7 @@ class GetSources : public QueuedTask
 {
 public:
     GetSources(const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetSources"),
         mMaterials(materials)
     {
     }
@@ -761,6 +789,7 @@ class GetCompatiblePairings : public QueuedTask
 {
 public:
     GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("GetCompatiblePairings"),
         mSplicer(splicer),
         mMaterials(materials)
     {
@@ -788,6 +817,7 @@ class MakeConnections : public QueuedTask
 {
 public:
     MakeConnections(const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("MakeConnections"),
         mMaterials(materials),
         mIncomingPairings(mMaterials->incomingPairings.size(), 0),
         mOutgoingPairings(mMaterials->outgoingPairings.size(), 0),
@@ -966,6 +996,7 @@ class CreateAndRegisterConnector : public QueuedTask
 public:
     CreateAndRegisterConnector(const SessionWrapperPtr& session, const MediaSplicerIPtr& splicer,
             const MediaConnectorBuilderPtr& materials) :
+        QueuedTask("CreateAndRegisterConnector"),
         mSession(session),
         mSplicer(splicer),
         mMaterials(materials)
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index d7b4271..8688c4e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -102,6 +102,7 @@ class RemoveBridgeTask : public QueuedTask
 {
 public:
     RemoveBridgeTask(const SessionWrapperPtr& session, const SessionListenerPrx& listener) :
+        QueuedTask("RemoveBridgeTask"),
         mSession(session),
         mSessionListener(listener)
     {
@@ -123,6 +124,9 @@ protected:
 
     void failed(const Ice::Exception& ex)
     {
+        //
+        // TODO: Log exception.
+        //
         mListener->failed();
     }
 
@@ -150,6 +154,7 @@ class SessionStopTask : public QueuedTask
 public:
     SessionStopTask(const SessionWrapperPtr& session,
             const ResponseCodePtr& code) :
+        QueuedTask("SessionStopTask"),
         mSession(session),
         mCode(code)
     {
@@ -172,6 +177,9 @@ protected:
 
     void failed(const Ice::Exception&)
     {
+        //
+        // TODO: Log exception.
+        //
         mListener->failed();
     }
     
@@ -199,6 +207,7 @@ class SetStateTask : public QueuedTask
 public:
     SetStateTask(const SessionWrapperPtr& session,
             const AsteriskSCF::Bridge::V1::BridgedSessionState newState) :
+        QueuedTask("SetStateTask"),
         mSession(session),
         mState(newState)
     {
@@ -239,6 +248,9 @@ protected:
 
     void failed(const Ice::Exception&)
     {
+        //
+        // TODO: Log exception.
+        //
         mListener->failed();
     }
     
@@ -250,6 +262,7 @@ class ConnectMediaTask : public QueuedTask
 {
 public:
     ConnectMediaTask(const SessionWrapperPtr& session) :
+        QueuedTask("ConnectMediaTask"),
         mSession(session)
     {
     }
@@ -309,7 +322,7 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
     mLogger(logger),
     mId(mSession->key),
     mSplicer(splicer),
-    mActivities(new Executor)
+    mActivities(new Executor(mLogger))
 {
 }
 
@@ -482,13 +495,13 @@ string SessionWrapper::id()
 void SessionWrapper::setup()
 {
     mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
-    mActivities->append(new QueueableExecutor(createSetupTasks(this)));
+    mActivities->append(new QueueableExecutor(createSetupTasks(this), mLogger));
 }
 
 void SessionWrapper::shutdown(const SessionListenerPrx& listener, const ResponseCodePtr& code)
 {
     mLogger(Debug) << FUNLOG << ": beginning shutdown of " << mId;
-    QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code)));
+    QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code), mLogger));
     //
     // TODO: determine if the pending activites should be shutdown first.
     //
diff --git a/src/Tasks.h b/src/Tasks.h
index f046cbc..06cc867 100644
--- a/src/Tasks.h
+++ b/src/Tasks.h
@@ -18,6 +18,7 @@
 #include <Ice/Ice.h>
 #include <boost/thread/locks.hpp>
 #include <list>
+#include <AsteriskSCF/logger.h>
 
 namespace AsteriskSCF
 {
@@ -43,6 +44,7 @@ class QueuedTask : virtual public IceUtil::Shared
     void operator=(const QueuedTask&);
     
 public:
+    
     virtual ~QueuedTask() {}
     
     //
@@ -89,11 +91,25 @@ public:
         mListener = listener;
     }
 
+    std::string name()
+    {
+        return mName;
+    }
+    
 protected:
     TaskListenerPtr mListener;
-    
-    QueuedTask() {} // for those derived classes that don't require arguments.
+    std::string mName;
+
+    QueuedTask() :
+        mName("<unnamed>")
+    {
+    }
 
+    QueuedTask(const std::string& name) :
+        mName(name)
+    {
+    }
+    
     virtual bool executeImpl() = 0;
     
     //
@@ -113,13 +129,15 @@ typedef std::list<QueuedTaskPtr> QueuedTasks;
 class Executor : virtual public TaskListener
 {
 public:
-    Executor() :
+    Executor(const AsteriskSCF::System::Logging::Logger& logger) :
+        mLogger(logger),
         mStopped(true)
     {
     }
     
-    Executor(const QueuedTasks& tasks) :
+    Executor(const QueuedTasks& tasks, const AsteriskSCF::System::Logging::Logger& logger) :
         mTasks(tasks),
+        mLogger(logger),
         mStopped(true)
     {
         for (QueuedTasks::iterator i = mTasks.begin(); i != mTasks.end(); ++i)
@@ -130,19 +148,15 @@ public:
 
     void start()
     {
-        QueuedTaskPtr t;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
-            if (mStopped)
+            if (!mStopped)
             {
-                t = mTasks.front();
-                mStopped = false;
+                return;
             }
+            mStopped = false;
         }
-        if (t && t->execute())
-        {
-            succeeded();
-        }
+        succeeded();
     }
 
     void stop()
@@ -159,49 +173,39 @@ public:
     //
     void succeeded()
     {
-        while (mTasks.size() > 0)
+        QueuedTaskPtr current = popNextTask();
+        while (current)
         {
-            QueuedTaskPtr t;
-            {
-                boost::unique_lock<boost::shared_mutex> lock(mLock);
-                if (!mStopped)
-                {
-                    mTasks.pop_front();
-                    if (mTasks.size() == 0)
-                    {
-                        mStopped = true;
-                        break;
-                    }
-                    t = mTasks.front();
-                }
-            }
-            if (t)
+            mLogger(AsteriskSCF::System::Logging::Debug) << ": executing " << current->name();
+            
+            if (!current->execute())
             {
-                //
-                // TODO: this is too easy to get wrong, should revisit.
-                //
-                // If execute returns false, the queue should suspend until it gets started again
-                // by some outside agent. If a task returns true, it should not call succeeded.
-                //
-                if (!t->execute())
-                {
-                    break;
-                }
+                break;
             }
+            current = popNextTask();
         }
     }
+
+    QueuedTaskPtr popNextTask()
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mTasks.empty() || mStopped)
+        {
+            return 0;
+        }
+        QueuedTaskPtr t = mTasks.front();
+        mTasks.pop_front();
+        return t;
+    }
     
     void failed()
     {
-        while (mTasks.size() > 0)
+        QueuedTaskPtr current = popNextTask();
+        while (current)
         {
-            QueuedTaskPtr i;
-            {
-                boost::unique_lock<boost::shared_mutex> lock(mLock);
-                i = mTasks.front();
-                mTasks.pop_front();
-            }
-            i->fail();
+            mLogger(AsteriskSCF::System::Logging::Debug) << ": failing " << current->name();
+            current->fail();
+            current = popNextTask();
         }
     }
 
@@ -210,16 +214,12 @@ public:
         //
         // Note that a request that is in progress will not be destroyed.
         //
-        while (mTasks.size() > 0)
+        QueuedTaskPtr current = popNextTask();
+        while (current)
         {
-            
-            QueuedTaskPtr i;
-            {
-                boost::unique_lock<boost::shared_mutex> lock(mLock);
-                i = mTasks.front();
-                mTasks.pop_front();
-            }
-            i->destroy();
+            mLogger(AsteriskSCF::System::Logging::Debug) << ": destroying " << current->name();
+            current->destroy();
+            current = popNextTask();
         }
     }
 
@@ -249,6 +249,7 @@ public:
 protected:
     boost::shared_mutex mLock;
     QueuedTasks mTasks;
+    AsteriskSCF::System::Logging::Logger mLogger;
     bool mStopped;
 };
 typedef IceUtil::Handle<Executor> ExecutorPtr;
@@ -259,12 +260,13 @@ public:
     //
     // Useful for derived classes that can only do initialization in the constructor body.
     //
-    QueueableExecutor()
+    QueueableExecutor(const AsteriskSCF::System::Logging::Logger& logger) :
+        Executor(logger)
     {
     }
 
-    QueueableExecutor(const QueuedTasks& tasks) :
-        Executor(tasks)
+    QueueableExecutor(const QueuedTasks& tasks, const AsteriskSCF::System::Logging::Logger& logger) :
+        Executor(tasks, logger)
     {
     }
 

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list