[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Mar 14 10:00:43 CDT 2011


branch "async-bridging" has been updated
       via  f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8 (commit)
       via  ed3ad2fb7cbebc110e9f6e537b4348e07bdb91b8 (commit)
      from  14ba2849cb6bfb07c77d2d32512ea242fc9da87c (commit)

Summary of changes:
 src/BridgeImpl.cpp        |   30 +--
 src/MediaSplicer.cpp      |  657 +++++++++++++++++++++++++++++++++++---------
 src/MediaSplicer.h        |   44 +++-
 src/ServiceUtil.h         |   17 ++
 src/SessionListener.cpp   |   70 +++--
 src/SessionOperations.cpp |   49 +---
 src/SessionWrapper.cpp    |  355 +++++++++++++++++++++++-
 src/SessionWrapper.h      |   32 ++-
 8 files changed, 998 insertions(+), 256 deletions(-)


- Log -----------------------------------------------------------------
commit f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8
Author: Brent Eagles <beagles at digium.com>
Date:   Mon Mar 14 12:30:23 2011 -0230

    Modified several functions to be AMI.

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 7b45558..5861bab 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -341,19 +341,7 @@ void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current&
         SessionWrapperPtr session = mSessions->getSession(*i);
         if (session)
         {
-            try
-            {
-                //
-                // TODO: AMI.
-                //
-                (*i)->removeBridge(mSessionListenerPrx);
-            }
-            catch (const Ice::Exception& ex)
-            {
-                mLogger(Info) << ": removing the bridge from " << (*i) << " threw " 
-                              << ex.what();
-            }
-            session->disconnect();
+            session->shutdown(mSessionListenerPrx, new ResponseCode);
             removedSessions.push_back(*i);
         }
     }
@@ -546,20 +534,8 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
     {
         throw SessionNotFound(oldSession);
     }
-    
-    try
-    {
-        //
-        // TODO: AMI.
-        //
-        oldSession->removeBridge(mSessionListenerPrx);
-    }
-    catch (const Ice::Exception& ex)
-    {
-        mLogger(Info) << ": removingthe bridge from " << oldSession << " threw " 
-                      << ex.what();
-    }
-    session->disconnect();
+    session->shutdown(mSessionListenerPrx, new ResponseCode);
+
     SessionSeq removed;
     removed.push_back(oldSession);
     mListeners->sessionsRemoved(removed);
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 3268d98..83f9b0b 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -20,13 +20,14 @@
 #include "BridgeReplicatorIf.h"
 #include <string>
 #include <vector>
+#include "ServiceUtil.h"
+#include "SessionWrapper.h"
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Bridge::V1;
 using namespace std;
 
-
 namespace AsteriskSCF
 {
 namespace BridgeService
@@ -39,6 +40,25 @@ class MedixMixerPtr;
 
 typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
 typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
+typedef vector<OutgoingPairing> OutgoingPairings;
+typedef vector<IncomingPairing> IncomingPairings;
+
+class MediaConnectorI;
+typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
+class MediaSplicerI;
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
+
+struct MediaConnectorBuilder : public IceUtil::Shared
+{
+public:
+    AsteriskSCF::Media::V1::SessionPrx mediaSession;
+    AsteriskSCF::Media::V1::StreamSourceSeq sources;
+    AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+    MediaConnectorIPtr peer;
+    OutgoingPairings outgoingPairings;
+    IncomingPairings incomingPairings;
+};
+typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
 
 //
 // TODO: These proxies could use some retry properties added to them...
@@ -48,17 +68,15 @@ typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::St
 class MediaConnectorI : public MediaConnector
 {
 public:
-    MediaConnectorI(const AsteriskSCF::Media::V1::SessionPrx& media,
-            const vector<OutgoingPairing>& outgoing, const vector<IncomingPairing>& incoming,
-            const MediaConnectorPtr& peer,
-            const string& bridgeId,
-            const string& sessionId,
+    MediaConnectorI(const MediaConnectorBuilderPtr& data,
+            const std::string& bridgeId,
+            const std::string& sessionId,
             const ReplicatorSmartPrx& replicator,
-            const Logger& logger):
-        mMedia(media),
-        mOutgoing(outgoing),
-        mIncoming(incoming),
-        mPeer(peer),
+            const Logger& logger) :
+        mMedia(data->mediaSession),
+        mOutgoing(data->outgoingPairings),
+        mIncoming(data->incomingPairings),
+        mPeer(data->peer), 
         mConnected(true),
         mBridgeId(bridgeId),
         mSessionId(sessionId),
@@ -66,19 +84,9 @@ public:
         mReplicator(replicator),
         mLogger(logger)
     {
-        mLogger(Debug) << FUNLOG << ": setting up a media connector with " << outgoing.size()
-                       << " outgoing pairings and " << incoming.size() << " incoming pairings.";
+        mLogger(Debug) << FUNLOG << ": setting up a media connector with " << mOutgoing.size()
+                       << " outgoing pairings and " << mIncoming.size() << " incoming pairings.";
         mKey = mBridgeId + mSessionId + "media";
-        for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
-        {
-            i->second->setSink(i->first);
-            i->first->setSource(i->second);
-        }
-        for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
-        {
-            i->first->setSink(i->second);
-            i->second->setSource(i->first);
-        }
     }
 
     MediaConnectorI(const SessionPairingPtr& pairing,
@@ -167,7 +175,8 @@ public:
         mIncoming.clear();
     }
 
-    void update(const MediaConnectorPtr& peer, const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming)
+    void update(const MediaConnectorPtr& peer, const OutgoingPairings& outgoing,
+            const IncomingPairings& incoming)
     {
         mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
         SessionPairingPtr update;
@@ -226,6 +235,78 @@ public:
         pushUpdate(createUpdate());
     }
 
+    //
+    // Like disconnect or unplug, but uses all oneway operations.
+    //
+    void destroy()
+    {
+        vector<OutgoingPairing> outgoing;
+        vector<IncomingPairing> incoming;
+        SessionPairingPtr update;
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            outgoing = mOutgoing;
+            mOutgoing.clear();
+            incoming = mIncoming;
+            mIncoming.clear();
+            mConnected = false;
+            update = createUpdate();
+        }
+        pushUpdate(update);
+        if (outgoing.size() == 0 && incoming.size() == 0)
+        {
+            return;
+        }
+        mLogger(Debug) << FUNLOG << ": unplugging sinks and sources";
+
+        //
+        // Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot
+        // for oneways. We don't care about errors when destroying. We still catch Ice::Exceptions though since
+        // a connection might be attempted, even on behalf of a oneway.
+        //
+        for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
+        {
+            mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and " << i->first->ice_toString();
+
+            try
+            {
+                tryOneWay(i->second)->setSink(0);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
+            try
+            {
+                tryOneWay(i->first)->setSource(0);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
+        }
+        for (vector<IncomingPairing>::iterator i = incoming.begin(); i != incoming.end(); ++i)
+        {
+            mLogger(Debug) << FUNLOG << ": disconnecting " << i->first->ice_toString() << " and " << i->second->ice_toString();
+            try
+            {
+                tryOneWay(i->first)->setSink(0);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
+            try
+            {
+                tryOneWay(i->second)->setSource(0);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+            }
+        }
+    }
+
     void update(const SessionPairingPtr& update)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -266,7 +347,7 @@ public:
 
         //
         // Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot
-        // for oneways or, at the very least, AMI.
+        // for oneways or, at the very least, AMI. XXX
         //
         for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
         {
@@ -332,16 +413,13 @@ private:
     Logger mLogger;
 };
 
-typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
-
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
 //
 // TODO: This needs to register the streams with an active threaded mixing element.
 //
 class MediaSplicerI : public IceUtil::Shared
 {
-
 public:
-
     MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
         mCommunicator(communicator),
         mBridgeId(bridgeId),
@@ -350,20 +428,23 @@ public:
     {
     }
 
-    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
+    MediaConnectorPtr createConnector(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& data)
     {
-        AsteriskSCF::Media::V1::SessionPrx media = session->getMediaSession();
-        if (!media)
+        MediaConnectorIPtr connector(new MediaConnectorI(data, mBridgeId, session->id(), mReplicator, mLogger));
+        if (data->peer)
         {
-            mLogger(Debug) << FUNLOG << ": no media available!";
-            return 0;
+            data->peer->update(connector, data->outgoingPairings, data->incomingPairings);
         }
+        connector->initialUpdate();
+        mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+        return connector;
+    }
 
+    void connect(const SessionWrapperPtr& session)
+    {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        MediaConnectorIPtr result;
-
         //
-        // Loop through looking for an identical media session, also reaping disconnected connectors as we go.
+        // Reap.
         //
         MediaSessions::iterator i = mSessions.begin();
         while (i != mSessions.end())
@@ -375,58 +456,21 @@ public:
                 i = mSessions.erase(t);
                 continue;
             }
-            if (i->mediaSession == media)
-            {
-                result = MediaConnectorIPtr::dynamicCast(i->connector);
-                assert(result);
-            }
             ++i;
         }
-        if (result)
-        {
-            mLogger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
-            return result;
-        }
 
         //
         // PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
         // one of them is going to have an empty connector.
         //
-
-        MediaConnectorPtr existing;
+        MediaConnectorIPtr existing;
         if (mSessions.size() == 1)
         {
             existing = mSessions.back().connector;
             existing->clearConnections();
         }
-
-        //
-        // Idiom-wise, it is best to defer calling getSources() until after the sinks have all been
-        // processed. However, we do not want to do the sink linkup if the getSources fails so we
-        // do both calls up here.
-        //
-        StreamSinkSeq sinks = media->getSinks();
-        StreamSourceSeq sources = media->getSources();
-
-        vector<OutgoingPairing> outgoingPairings(findCompatiblePairings(sinks));
-        vector<IncomingPairing> incomingPairings(findCompatiblePairings(sources));
-
-        string sessionId = mCommunicator->identityToString(session->ice_getIdentity());
-        result = new MediaConnectorI(media, outgoingPairings, incomingPairings, existing, mBridgeId, sessionId, mReplicator, mLogger);
-        mLogger(Debug) << FUNLOG << ": established connections, returning connector object";
-        mSessions.push_back(MediaSessionStruct(media, result));
-        result->initialUpdate();
-
-        //
-        // PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
-        // one of them is going to have an empty connector. Now that we are adding the new connector
-        //
-        if (existing)
-        {
-            MediaConnectorIPtr p = MediaConnectorIPtr::dynamicCast(existing);
-            p->update(result, outgoingPairings, incomingPairings);
-        }
-        return result;
+        ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this)));
+        taskExecutor->start();
     }
 
     MediaConnectorPtr createReplica(const SessionPairingPtr& pairings)
@@ -458,6 +502,89 @@ public:
         return result;
     }
 
+    vector<OutgoingPairing> findCompatiblePairings(const StreamSinkSeq& sinks)
+    {
+        vector<StreamSourceSeq> allSources;
+
+        //
+        // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
+        // will come into a mix and go out.
+        //
+        for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        {
+            allSources.push_back(i->mediaSession->getSources());
+        }
+
+        vector<OutgoingPairing> result;
+        for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+        {
+            bool connected = false;
+            for (vector<StreamSourceSeq>::iterator j = allSources.begin(); j != allSources.end() && !connected; ++j)
+            {
+                for (StreamSourceSeq::iterator k = j->begin(); k != j->end(); ++k)
+                {
+                    if (canAdapt(*i, *k))
+                    {
+                        result.push_back(OutgoingPairing(*i, *k));
+                        connected = true;
+                        StreamSourceSeq::iterator t = k;
+                        j->erase(t);
+                        break;
+                    }
+                }
+            }
+            if (!connected)
+            {
+                //
+                // TODO: We couldn't establish a proper connection for this sink. What happens
+                // here is probably a policy thing.
+                //
+            }
+        }
+        return result;
+    }
+
+    vector<IncomingPairing> findCompatiblePairings(const StreamSourceSeq& sources)
+    {
+        vector<StreamSinkSeq> allSinks;
+        //
+        // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
+        // will come into a mix and go out.
+        //
+        for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        {
+            allSinks.push_back(i->mediaSession->getSinks());
+        }
+
+        vector<IncomingPairing> result;
+        for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+        {
+            bool connected = false;
+            for (vector<StreamSinkSeq>::iterator j = allSinks.begin(); j != allSinks.end() && !connected; ++j)
+            {
+                for (StreamSinkSeq::iterator k = j->begin(); k != j->end(); ++k)
+                {
+                    if (canAdapt(*i, *k))
+                    {
+                        result.push_back(IncomingPairing(*i, *k));
+                        StreamSinkSeq::iterator t = k;
+                        j->erase(t);
+                        connected = true;
+                        break;
+                    }
+                }
+            }
+            if (!connected)
+            {
+                //
+                // TODO: We couldn't establish a proper connection for this source! What happens now
+                // should be a matter of policy.
+                //
+            }
+        }
+        return result;
+    }
+    
 private:
 
     boost::shared_mutex mLock;
@@ -469,9 +596,9 @@ private:
     struct MediaSessionStruct
     {
         SessionPrx mediaSession;
-        MediaConnectorPtr connector;
+        MediaConnectorIPtr connector;
 
-        MediaSessionStruct(const SessionPrx& session, const MediaConnectorPtr& con) :
+        MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
             mediaSession(session),
             connector(con)
         {
@@ -532,82 +659,346 @@ private:
         //
         return true;
     }
+};
 
-    vector<OutgoingPairing> findCompatiblePairings(const StreamSinkSeq& sinks)
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
+
+class GetMediaSession : public QueuedTask
+{
+public:
+    GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+        mSession(session),
+        mMaterials(materials)
     {
-        vector<StreamSourceSeq> allSources;
-        for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mSession->getSession()->begin_getMediaSession(
+            AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
+                        &GetMediaSession::done, &GetMediaSession::failed));
+        return false;
+    }
+
+    void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+    {
+        mMaterials->mediaSession = mediaSession;
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception& ex)
+    {
+        mListener->failed();
+    }
+private:
+    SessionWrapperPtr mSession;
+    MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetSinks : public QueuedTask
+{
+public:
+    GetSinks(const MediaConnectorBuilderPtr& materials) :
+        mMaterials(materials)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
+                        &GetSinks::done, &GetSinks::failed));
+        return false;
+    }
+
+    void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+    {
+        mMaterials->sinks = sinks;
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception&)
+    {
+        mListener->failed();
+    }
+private:
+    MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetSources : public QueuedTask
+{
+public:
+    GetSources(const MediaConnectorBuilderPtr& materials) :
+        mMaterials(materials)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
+                        &GetSources::done, &GetSources::failed));
+        return false;
+    }
+
+    void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+    {
+        mMaterials->sources = sources;
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception&)
+    {
+        mListener->failed();
+    }
+    
+private:
+    MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetCompatiblePairings : public QueuedTask
+{
+public:
+    GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+        mSplicer(splicer),
+        mMaterials(materials)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->sources);
+        mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->sinks);
+        return true;
+    }
+
+private:
+    MediaSplicerIPtr mSplicer;
+    MediaConnectorBuilderPtr mMaterials;
+};
+
+//
+// Handles setting up the connections using AMI. A little tricky to do in a minimal way, so enjoy.  NOTE: It might
+// appear that this class is doing a little more bookkeeping than would be "minimal", but I've added a fair bit of
+// tracking with the hopes of implementing some rudimentary rollback in the future.
+//
+class MakeConnections : public QueuedTask
+{
+public:
+    MakeConnections(const MediaConnectorBuilderPtr& materials) :
+        mMaterials(materials),
+        mIncomingPairings(mMaterials->incomingPairings.size(), 0),
+        mOutgoingPairings(mMaterials->outgoingPairings.size(), 0),
+        mTotalCount(mIncomingPairings.size() + mOutgoingPairings.size())
+    {
+    }
+
+protected:
+
+    //
+    // Incoming and outgoing pairings follow the same logic, but use different types. The best way to code that kind of
+    // thing is with templates!
+    //
+    template <class T>
+    class PairingHelper : public IceUtil::Shared
+    {
+    public:
+        PairingHelper(const IceUtil::Handle<MakeConnections>& task, size_t position) :
+            mTask(task),
+            mPosition(position),
+            mSourceFlag(false),
+            mSinkFlag(false)
         {
-            allSources.push_back(i->mediaSession->getSources());
         }
 
-        vector<OutgoingPairing> result;
-        for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+        void setSinkCB()
         {
-            bool connected = false;
-            for (vector<StreamSourceSeq>::iterator j = allSources.begin(); j != allSources.end() && !connected; ++j)
+            bool other = false;
             {
-                for (StreamSourceSeq::iterator k = j->begin(); k != j->end(); ++k)
-                {
-                    if (canAdapt(*i, *k))
-                    {
-                        result.push_back(OutgoingPairing(*i, *k));
-                        connected = true;
-                        StreamSourceSeq::iterator t = k;
-                        j->erase(t);
-                        break;
-                    }
-                }
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                mSinkFlag = true;
+                other = mSourceFlag;
             }
-            if (!connected)
+            if (other)
             {
-                //
-                // TODO: We couldn't establish a proper connection for this sink. What happens
-                // here is probably a policy thing.
-                //
+                done();
             }
         }
-        return result;
-    }
 
-    vector<IncomingPairing> findCompatiblePairings(const StreamSourceSeq& sources)
+        void setSourceCB()
+        {
+            bool other = false;
+            {
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
+                mSourceFlag = true;
+                other = mSinkFlag;
+            }
+            if (other)
+            {
+                done();
+            }
+        }
+
+        void failed(const Ice::Exception& ex)
+        {
+            mTask->failed(ex, mPosition);
+        }
+
+        void done();
+
+    private:
+
+        IceUtil::Handle<MakeConnections> mTask;
+        boost::shared_mutex mLock;
+        size_t mPosition;
+        bool mSourceFlag;
+        bool mSinkFlag;
+    };
+    
+    typedef PairingHelper<OutgoingPairing> OutgoingHelper;
+    typedef PairingHelper<IncomingPairing> IncomingHelper;
+    typedef IceUtil::Handle<OutgoingHelper> OutgoingHelperPtr;
+    typedef IceUtil::Handle<IncomingHelper> IncomingHelperPtr;
+    
+    bool executeImpl()
     {
-        vector<StreamSinkSeq> allSinks;
-        for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        //
+        // If there are lots of pairings, this could result in a "flurry" of AMI requests. I'm not sure that it is
+        // anything to worry about though. Ice will queue the requests and send them according to the "send" threads
+        // available.
+        //
+        size_t index = 0;
+        for (OutgoingPairings::iterator i = mMaterials->outgoingPairings.begin(); i != mMaterials->outgoingPairings.end(); ++i)
         {
-            allSinks.push_back(i->mediaSession->getSinks());
+            OutgoingHelperPtr h(new OutgoingHelper(this, index));
+            i->second->begin_setSink(i->first, AsteriskSCF::Media::V1::newCallback_StreamSource_setSink(h, &OutgoingHelper::setSinkCB, &OutgoingHelper::failed));
+            i->first->begin_setSource(i->second, AsteriskSCF::Media::V1::newCallback_StreamSink_setSource(h, &OutgoingHelper::setSourceCB, &OutgoingHelper::failed));
+            ++index;
         }
+        index = 0;
+        for (IncomingPairings::iterator i = mMaterials->incomingPairings.begin(); i != mMaterials->incomingPairings.end(); ++i)
+        {
+            IncomingHelperPtr h(new IncomingHelper(this, index));
+            i->first->begin_setSink(i->second,  AsteriskSCF::Media::V1::newCallback_StreamSource_setSink(h, &IncomingHelper::setSinkCB, &IncomingHelper::failed));
+            i->second->begin_setSource(i->first,  AsteriskSCF::Media::V1::newCallback_StreamSink_setSource(h, &IncomingHelper::setSourceCB, &IncomingHelper::failed));
+            ++index;
+        }
+        return false;
+    }
 
-        vector<IncomingPairing> result;
-        for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+    void incoming(size_t position)
+    {
+        bool done = false;
         {
-            bool connected = false;
-            for (vector<StreamSinkSeq>::iterator j = allSinks.begin(); j != allSinks.end() && !connected; ++j)
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if (mIncomingPairings.at(position) == 0)
             {
-                for (StreamSinkSeq::iterator k = j->begin(); k != j->end(); ++k)
-                {
-                    if (canAdapt(*i, *k))
-                    {
-                        result.push_back(IncomingPairing(*i, *k));
-                        StreamSinkSeq::iterator t = k;
-                        j->erase(t);
-                        connected = true;
-                        break;
-                    }
-                }
+                mIncomingPairings.at(position) = 1;
+                ++mCurrentCount;
+                done = (mCurrentCount == mTotalCount);
             }
-            if (!connected)
+        }
+        if (done)
+        {
+            mListener->succeeded();
+        }
+    }
+
+    void outgoing(size_t position)
+    {
+        bool done = false;
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if (mOutgoingPairings.at(position) == 0)
             {
-                //
-                // TODO: We couldn't establish a proper connection for this source! What happens now
-                // should be a matter of policy.
-                //
+                mOutgoingPairings.at(position) = 1;
+                ++mCurrentCount;
+                done = (mCurrentCount == mTotalCount);
             }
         }
-        return result;
+        if (done)
+        {
+            mListener->succeeded();
+        }
+    }
+
+    void failed(const Ice::Exception& ex, size_t position)
+    {
+        //
+        // TODO: Do something special here probably. Its not clear that the whole operation should failed if a single
+        // pairing out of the whole group failed. At the very least, there should be a rollback.
+        //
+        mListener->failed();
+    }
+
+private:
+    boost::shared_mutex mLock;
+    MediaConnectorBuilderPtr mMaterials;
+
+    //
+    // Keeping track of the responses from incoming and outgoing pairings allow the total count to be used as a finished
+    // flag even in the event of duplicate returns. And as already indicated the mTotalCount lets us know when its time
+    // to say we are done.
+    //
+    Ice::IntSeq mIncomingPairings;
+    Ice::IntSeq mOutgoingPairings;
+    size_t mCurrentCount;
+    const size_t mTotalCount;
+};
+
+//
+// Specializations so the connections helper can keep track of the different sets of pairings/.
+//
+template <> void MakeConnections::PairingHelper<OutgoingPairing>::done()
+{
+    mTask->outgoing(mPosition);
+}
+
+template <> void MakeConnections::PairingHelper<IncomingPairing>::done()
+{
+    mTask->incoming(mPosition);
+}
+
+class CreateAndRegisterConnector : public QueuedTask
+{
+public:
+    CreateAndRegisterConnector(const SessionWrapperPtr& session, const MediaSplicerIPtr& splicer,
+            const MediaConnectorBuilderPtr& materials) :
+        mSession(session),
+        mSplicer(splicer),
+        mMaterials(materials)
+    {
     }
+
+    bool executeImpl()
+    {
+        MediaConnectorPtr connector(mSplicer->createConnector(mSession, mMaterials));
+        mSession->setConnector(connector);
+        return true;
+    }
+    
+private:
+    SessionWrapperPtr mSession;
+    MediaSplicerIPtr mSplicer;
+    MediaConnectorBuilderPtr mMaterials;
 };
 
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer)
+{
+    QueuedTasks tasks;
+    MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
+    materials->peer = peer;
+    tasks.push_front(new GetMediaSession(session, materials));
+    tasks.push_front(new GetSinks(materials));
+    tasks.push_front(new GetSources(materials));
+    tasks.push_front(new GetCompatiblePairings(splicer, materials));
+    tasks.push_front(new MakeConnections(materials));
+    tasks.push_front(new CreateAndRegisterConnector(session, splicer, materials));
+    return tasks;
+}
+
 } // End of namespace BridgeService
 } // End of namespace AsteriskSCF
 
@@ -619,11 +1010,7 @@ MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string&
 {
 }
 
-MediaSplicer::~MediaSplicer()
-{
-}
-
-MediaConnectorPtr MediaSplicer::connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
+void MediaSplicer::connect(const SessionWrapperPtr& session)
 {
     return mImpl->connect(session);
 }
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 08443d8..65108f1 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -27,36 +27,66 @@ namespace AsteriskSCF
 {
 namespace BridgeService
 {
+
+//
+// Forward declarations.
+//
+class SessionWrapper;
+typedef IceUtil::Handle<SessionWrapper> SessionWrapperPtr;
+
+/**
+ *
+ * Interface for MediaConnector objects.
+ *
+ **/
 class MediaConnector : public IceUtil::Shared
 {
 public:
     virtual ~MediaConnector() {}
+
+    /**
+     * Disconnects the media pairings associated with this connector. It will involve the
+     * peer media connector, if one exists.
+     **/
     virtual void unplug() = 0;
+    
     virtual bool isConnected() = 0;
+
+    /**
+     * Simply "forget" the pairings. Used when this connector's complement has been disconnect already.
+     **/
     virtual void clearConnections() = 0;
+    
+    /**
+     * Nicely disconnect the media pairings owned by this connector.
+     **/
     virtual bool disconnectMedia() = 0;
+    
+    /**
+     * Best effort, fire-forget disconnect the media pairings.
+     **/
+    virtual void destroy() = 0;
 
+    /**
+     * Replication support function.
+     **/
     virtual void update(const AsteriskSCF::Bridge::V1::SessionPairingPtr& update) = 0;
 };
-
 typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
-
+ 
 class MediaSplicerI;
-
 class MediaSplicer : public IceUtil::Shared
 {
 public:
     MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
             const AsteriskSCF::System::Logging::Logger& logger);
-    ~MediaSplicer();
-    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
-
+    
+    void connect(const SessionWrapperPtr& session);
     MediaConnectorPtr createReplica(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
 
 private:
     boost::shared_ptr<MediaSplicerI> mImpl;
 };
-
 typedef IceUtil::Handle<MediaSplicer> MediaSplicerPtr;
 
 } // End of namespace BridgeService
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 280c49e..c5a61b3 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -57,8 +57,12 @@ public:
                 return;
             }
 
+#if 0
             session->setConnected();
             session->setupMedia();
+#endif
+            session->setup();
+            
             ConnectSessionOperation connector(source, mLogger);
             mSessions->visitSessions(connector);
         }
@@ -71,19 +75,31 @@ public:
 
     void flashed(const SessionPrx& source, const Ice::Current&)
     {
+        //
+        // TODO: Is there something to be done here?
+        // 
     }
 
     void held(const SessionPrx& source, const Ice::Current&)
     {
+        //
+        // TODO: Should this affect the media operations.
+        // 
     }
 
     void progressing(const SessionPrx& source, 
       const ResponseCodePtr& response, const Ice::Current&)
     {
+        //
+        // TODO: Is there something to be done here?
+        // 
     }
 
     void ringing(const SessionPrx& source, const Ice::Current&)
     {
+        //
+        // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+        //
         RingSessionOperation ringer(source, mLogger);
         mSessions->visitSessions(ringer);
     }
@@ -93,34 +109,38 @@ public:
         string proxyString = source->ice_toString();
         mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
 
+        //
+        // IMPLNOTE: The AMI approach.
+        // Stack some operations in a callback. In this case it will be:
+        // 1. getSession()
+        // 2. removeBridge()
+        // 3. disconnect() or destroy()
+        //
+        // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
+        // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
+        // ways to go about it. Need to think on this more.
+        //
+        // Q. What is the proper order of operations here, should removeBridge be called before or after we
+        // "disconnect" the session? 
+        //
         SessionWrapperPtr session;
-        try
-        {
-            session = mSessions->getSession(source);
-            if (!session)
-            {
-                mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
-                              << proxyString << " that does not match known sessions. Possible out of order operations.";
-                return;
-            }
-
-            //
-            // We don't actually have the proxy for this object, but we can create one on the fly.
-            //
-            SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
-            source->removeBridge(listenerPrx);
-            session->disconnect();
-        }
-        catch (const Ice::ObjectNotExistException& ex)
+        session = mSessions->getSession(source);
+        if (!session)
         {
-            session->destroy();
-            mLogger(Debug) << FUNLOG << ": " << ex.what();
-        }
-        catch (const Ice::Exception& ex)
-        {
-            mLogger(Debug) << FUNLOG << ": " << ex.what();
-            throw;
+            mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+                          << proxyString << " that does not match known sessions. Possible out of order operations.";
+            return;
         }
+
+        //
+        // We don't actually have the proxy for this object, but we can create one on the fly.
+        //
+        SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
+
+        //
+        // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
+        //
+        session->shutdown(listenerPrx, new ResponseCode);
     }
 
     void unheld(const SessionPrx& source, const Ice::Current&)
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 279bc96..4ae2ab9 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -33,26 +33,10 @@ void ConnectSessionOperation::operator()(const SessionWrapperPtr& s)
 {
     if (s->getSession()->ice_getIdentity() != mExclude)
     {
-        try
-        {
-            //
-            // TODO: AMI!
-            //
-            s->connect();
-        }
-        catch (const Ice::ObjectNotExistException& ex)
-        {
-            //
-            // If the object has indeed been destroyed, we tell the wrapper that it should
-            // go away and it takes care of making sure that the right things happen.
-            //
-            s->destroy();
-            mLogger(Debug) << FUNLOG << ": " << ex.what();
-        }
-        catch (const Ice::Exception& ex)
-        {
-            mLogger(Debug) << FUNLOG << ": " << ex.what();
-        }
+        //
+        // This is ultimately implemented as an AMI request, so this will return pretty much immediately.
+        //
+        s->connect();
     }
 }
 
@@ -67,28 +51,7 @@ ShutdownSessionOperation::ShutdownSessionOperation(const SessionListenerPrx& lis
 
 void ShutdownSessionOperation::operator()(const SessionWrapperPtr& wrapper)
 {
-    SessionPrx s = wrapper->getSession();
-    try
-    {
-        //
-        // TODO: Make AMI.
-        //
-        s->removeBridge(mListener);
-        wrapper->disconnect();
-        s->stop(mResponse);
-    }
-    catch (const Ice::ObjectNotExistException& ex)
-    {
-        //
-        // Just make sure we are being cleaned up properly.
-        //
-        wrapper->destroy();
-        mLogger(Debug) << FUNLOG << ": " << ex.what();
-    }
-    catch (const Ice::Exception& ex)
-    {
-        mLogger(Debug) << FUNLOG << ": " << ex.what();
-    }
+    wrapper->shutdown(mListener, mResponse);
 }
 
 RingSessionOperation::RingSessionOperation(const SessionPrx& exclude, const Logger& logger) :
@@ -105,7 +68,7 @@ void RingSessionOperation::operator()(const SessionWrapperPtr& session)
         try
         {
             //
-            // TODO: AMI!
+            // TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
             //
             s->ring();
         }
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 5c8b13f..7c618c9 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -19,8 +19,6 @@
 #include <AsteriskSCF/logger.h>
 #include "ServiceUtil.h"
 
-#include <boost/thread/once.hpp>
-
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
 using namespace AsteriskSCF::BridgeService;
@@ -74,6 +72,7 @@ public:
             // XXX- it might be reasonable to disconnect this session for this!
             //
             mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+            mSession->destroy();
         }
         catch (const Ice::SocketException& ex)
         {
@@ -89,12 +88,214 @@ public:
             //
             mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
         }
-    }
+   }
     
 private:
     SessionWrapperPtr mSession;
     Logger mLogger;
 };
+
+//
+// Micro tasks
+//
+class RemoveBridgeTask : public QueuedTask
+{
+public:
+    RemoveBridgeTask(const SessionWrapperPtr& session, const SessionListenerPrx& listener) :
+        mSession(session),
+        mSessionListener(listener)
+    {
+    }
+    
+protected:
+    bool executeImpl()
+    {
+        mSession->getSession()->begin_removeBridge(mSessionListener,
+                newCallback_Session_removeBridge(this, &RemoveBridgeTask::removed,
+                        &RemoveBridgeTask::failed));
+        return false;
+    }
+
+    void removed()
+    {
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception& ex)
+    {
+        mListener->failed();
+    }
+
+    void failImpl()
+    {
+        //
+        // NO-OP
+        //
+    }
+
+    void destroyImpl()
+    {
+        //
+        // NO-OP
+        //
+    }
+
+private:
+    SessionWrapperPtr mSession;
+    SessionListenerPrx mSessionListener;
+};
+
+class SessionStopTask : public QueuedTask
+{
+public:
+    SessionStopTask(const SessionWrapperPtr& session,
+            const ResponseCodePtr& code) :
+        mSession(session),
+        mCode(code)
+    {
+    }
+    
+protected:
+    bool executeImpl()
+    {
+
+        mSession->getSession()->begin_stop(mCode,
+            newCallback_Session_stop(this, &SessionStopTask::removed,
+                    &SessionStopTask::failed));
+        return false;
+    }
+
+    void removed()
+    {
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception&)
+    {
+        mListener->failed();
+    }
+    
+    void failImpl()
+    {
+        //
+        // NO-OP
+        //
+    }
+
+    void destroyImpl()
+    {
+        //
+        // NO-OP
+        //
+    }
+    
+private:
+    SessionWrapperPtr mSession;
+    ResponseCodePtr mCode; 
+};
+
+class SetStateTask : public QueuedTask
+{
+public:
+    SetStateTask(const SessionWrapperPtr& session,
+            const AsteriskSCF::Bridge::V1::BridgedSessionState newState) :
+        mSession(session),
+        mState(newState)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mSession->setState(mState);
+        return true;
+    }
+
+private:
+    SessionWrapperPtr mSession;
+    AsteriskSCF::Bridge::V1::BridgedSessionState mState;
+    AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
+};
+
+class ShutdownMediaTask : public QueuedTask
+{
+public:
+    ShutdownMediaTask(const SessionWrapperPtr& session) :
+        mSession(session)
+    {
+    }
+    
+protected:
+    bool executeImpl()
+    {
+        mSession->unplugMedia();
+        return false;
+    }
+
+    void done()
+    {
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception&)
+    {
+        mListener->failed();
+    }
+    
+private:
+    SessionWrapperPtr mSession;
+};
+
+class ConnectMediaTask : public QueuedTask
+{
+public:
+    ConnectMediaTask(const SessionWrapperPtr& session) :
+        mSession(session)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        mSession->setupMedia();
+        return true;
+    }
+
+    void done()
+    {
+        mListener->succeeded();
+    }
+
+    void failed(const Ice::Exception&)
+    {
+        mListener->failed();
+    }
+private:
+    SessionWrapperPtr mSession;
+};
+
+QueuedTasks createShutdownTasks(const SessionWrapperPtr& session, const SessionListenerPrx& listener,
+        const ResponseCodePtr& code)
+{
+    //
+    // Tasks are a queued, so they go in order that they will be processed.
+    //
+    QueuedTasks tasks;
+    tasks.push_front(new SetStateTask(session, BridgedSessionState::Disconnected));
+    tasks.push_front(new RemoveBridgeTask(session, listener));
+    tasks.push_front(new ShutdownMediaTask(session));
+    tasks.push_front(new SessionStopTask(session, code));
+    return tasks;
+}
+
+QueuedTasks createSetupTasks(const SessionWrapperPtr& session)
+{
+    QueuedTasks tasks;
+    tasks.push_front(new SetStateTask(session, BridgedSessionState::Connected));
+    tasks.push_front(new ConnectMediaTask(session));
+    return tasks;
+}
+
 }
 
 SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
@@ -104,7 +305,8 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
     mReplicator(replicator),
     mLogger(logger),
     mId(mSession->key),
-    mSplicer(splicer)
+    mSplicer(splicer),
+    mActivities(new Executor)
 {
 }
 
@@ -128,7 +330,7 @@ void SessionWrapper::connect()
         mSession->session->begin_connect(
             newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
                     &ConnectRequestCallback::connectCB,
-                    &ConnectRequestCallback::failureCB), this);
+                    &ConnectRequestCallback::failureCB));
     }
 }
 
@@ -136,7 +338,7 @@ void SessionWrapper::ring()
 {
     {
         boost::shared_lock<boost::shared_mutex> lock(mLock);
-        if (mSession->currentState == BridgedSessionState::Connected)
+        if (mSession->currentState != BridgedSessionState::Added)
         {
             return;
         }
@@ -180,10 +382,7 @@ void SessionWrapper::setupMedia()
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState == BridgedSessionState::Added)
         {
-            if (!mConnector)
-            {
-                mConnector = mSplicer->connect(mSession->session);
-            }
+            mSplicer->connect(this);
             mSession->currentState = BridgedSessionState::Connected;
             update = createUpdate();
         }
@@ -191,6 +390,15 @@ void SessionWrapper::setupMedia()
     pushUpdate(update);
 }
 
+void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
+{
+    mLogger(Debug) << FUNLOG << " for " << mId;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mConnector = connector;
+    }
+}
+
 void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
 {
     mLogger(Debug) << FUNLOG << " for " << mId;
@@ -245,6 +453,10 @@ void SessionWrapper::destroy()
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState != BridgedSessionState::Done)
         {
+            //
+            // TODO: This should be a best effort attempt to unplug the media. This might mean some
+            // oneways along with some exception wherever the disconnections are actually taking place.
+            //
             unplugMedia();
             mSession->currentState = BridgedSessionState::Done;
             update = createUpdate();
@@ -264,14 +476,39 @@ string SessionWrapper::id()
     return mId;
 }
 
-void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+void SessionWrapper::setup()
 {
-    if (update && mReplicator)
+    mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
+    mActivities->append(new QueueableExecutor(createSetupTasks(this)));
+}
+
+void SessionWrapper::shutdown(const SessionListenerPrx& listener, const ResponseCodePtr& code)
+{
+    mLogger(Debug) << FUNLOG << ": beginning shutdown of " << mId;
+    QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code)));
+    //
+    // TODO: determine if the pending activites should be shutdown first.
+    //
+    mActivities->append(shutdownRunner);
+    //
+    // If shutdown is going to preempt stuff we need to uncomment this.
+    //
+    // shutdownRunner->start();
+}
+
+void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
+{
+    mLogger(Debug) << FUNLOG << ": updating state " << mId;
+    BridgedSessionPtr update;
     {
-        ReplicatedStateItemSeq seq;
-        seq.push_back(update);
-        mReplicator->setState(seq);
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        //
+        // TODO:
+        //
+        mSession->currentState = newState;
+        update = createUpdate();
     }
+    pushUpdate(update);
 }
 
 void SessionWrapper::unplugMedia()
@@ -289,6 +526,16 @@ void SessionWrapper::unplugMedia()
     }
  }
 
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+{
+    if (update && mReplicator)
+    {
+        ReplicatedStateItemSeq seq;
+        seq.push_back(update);
+        mReplicator->setState(seq);
+    }
+}
+
 BridgedSessionPtr SessionWrapper::createUpdate()
 {
     //
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index c63fdb4..52ad6cd 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -20,6 +20,7 @@
 #include <AsteriskSCF/logger.h>
 #include <boost/thread/locks.hpp>
 #include "MediaSplicer.h"
+#include "Tasks.h"
 
 namespace AsteriskSCF
 {
@@ -85,6 +86,7 @@ public:
      * Initiates replication.
      **/
     void setupMedia();
+    void setConnector(const MediaConnectorPtr& connector);
     void updateMedia(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
 
     /**
@@ -111,6 +113,27 @@ public:
     bool isDestroyed();
 
     std::string id();
+
+    //
+    // Large scale macro operations. In effect the wrapper becomes a sub-component within the bridge.
+    //
+    // TODO: Should replace the more granular operations in the interface.
+    //
+    void setup();
+    
+    void shutdown(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+            const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& code);
+
+
+    //
+    // TODO: Refactor so these methods don't need to be exposed.
+    //
+    void setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
+
+    /**
+     * Disconnection helper.
+     **/
+    void unplugMedia();
     
 private:
 
@@ -121,17 +144,14 @@ private:
     AsteriskSCF::System::Logging::Logger mLogger;
     std::string mId;
     MediaSplicerPtr mSplicer;
-
+    ExecutorPtr mActivities;
+    
     /**
      * Sends changes to the replication service. This should never occur
      * unless the host service is active.
      **/
     void pushUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& update);
 
-    /**
-     * Disconnection helper.
-     **/
-    void unplugMedia();
 
     AsteriskSCF::Bridge::V1::BridgedSessionPtr createUpdate();
 };

commit ed3ad2fb7cbebc110e9f6e537b4348e07bdb91b8
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Mar 10 11:12:08 2011 -0330

    Make Sesson:connected() calls AMI.

diff --git a/src/ServiceUtil.h b/src/ServiceUtil.h
index 36610b9..f2b4917 100644
--- a/src/ServiceUtil.h
+++ b/src/ServiceUtil.h
@@ -170,4 +170,21 @@ private:
     size_t mCounter;
 };
 
+//
+// Helper template to try and create a one way proxy. If the endpoint configuration on the proxy will not allow
+// oneways, the original proxy is returned.
+//
+template <typename PrxType>
+PrxType tryOneWay(const PrxType& orig)
+{
+    try
+    {
+        return PrxType::uncheckedCast(orig->ice_oneway());
+    }
+    catch (const Ice::NoEndpointException& ex)
+    {
+    }
+    return orig;
+}
+
 }; /** End of namespace AsteriskSCF */
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 21bca8d..5c8b13f 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -17,6 +17,9 @@
 #include "SessionWrapper.h"
 #include <Ice/Ice.h>
 #include <AsteriskSCF/logger.h>
+#include "ServiceUtil.h"
+
+#include <boost/thread/once.hpp>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -25,6 +28,75 @@ using namespace AsteriskSCF::Bridge::V1;
 using namespace AsteriskSCF;
 using namespace std;
 
+namespace
+{
+//
+// NOTE: This object is used in support of making connect() calls on sessions via AMI. It's a fair candidate for a
+// singleton as you could pass the session as a cookie to the AMI call. However, it is desirable to have some logging,
+// and initializating the singleton from the proper logger instance is a little problematic to do in a fashion
+// consistent with how the rest of the service uses a logger. An alternative would be to have the actual implementation
+// of the callbacks be implemented by the SessionWrapper, which already has the logger instance. I will leave this as is
+// for now as introducing callback interfaces on the wrapper for AMI callback purposes seems overkill for the moment. So
+// perhaps this should be labelled REVISIT.
+//
+class ConnectRequestCallback : public IceUtil::Shared
+{
+public:
+    ConnectRequestCallback(const SessionWrapperPtr& session, const Logger& logger):
+        mSession(session),
+        mLogger(logger)
+    {
+    }
+
+    void connectCB()
+    {
+        //
+        // There isn't rally anything to do on success.
+        //
+        mLogger(Debug) << "connect call on " << mSession->id() << " succeeded.";
+    }
+
+    void failureCB(const Ice::Exception& ex)
+    {
+        //
+        // Failure is a little more interesting. We should really retry on some exceptions, and
+        // auto-disconnect the session for others.
+        //
+        try
+        {
+            ex.ice_throw();
+        }
+        catch (const Ice::RequestFailedException& ex)
+        {
+            //
+            // These exceptions indicate something is wrong with the request on this object. We should not expect that
+            // it could ever exceed. ObjectNotExistException is included in this catch.
+            // XXX- it might be reasonable to disconnect this session for this!
+            //
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+        }
+        catch (const Ice::SocketException& ex)
+        {
+            //
+            // A possible retry scenario.
+            //
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+        }
+        catch (const std::exception& ex)
+        {
+            //
+            // Not much to be done here except log the error and move on.
+            //
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+        }
+    }
+    
+private:
+    SessionWrapperPtr mSession;
+    Logger mLogger;
+};
+}
+
 SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
         const MediaSplicerPtr& splicer, const ReplicatorSmartPrx& replicator,
         const Logger& logger) :
@@ -46,7 +118,17 @@ void SessionWrapper::connect()
 {
     if (setConnected())
     {
-        mSession->session->connect();
+        //
+        // A word about expected context. In a conference bridge, it is quite possible that multiple sessions will have
+        // connect() called on them "at once". By using AMI to send these requests out, we free the
+        // SessionWrapper::connect() caller alone to proceed. In a sense, each individual session wrapper instance takes
+        // care of dealing with the AMI callbacks. More accurately, there is a class wide singleton for the callback
+        // object and the SessionWrapper itself is passed as the cookie. No further "book-keeping" is required.
+        //
+        mSession->session->begin_connect(
+            newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
+                    &ConnectRequestCallback::connectCB,
+                    &ConnectRequestCallback::failureCB), this);
     }
 }
 
@@ -59,7 +141,7 @@ void SessionWrapper::ring()
             return;
         }
     }
-    mSession->session->ring();
+    tryOneWay(mSession->session)->ring();
 }
 
 bool SessionWrapper::setConnected()
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e11da7c..c63fdb4 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -26,7 +26,7 @@ namespace AsteriskSCF
 namespace BridgeService
 {
 
-class SessionWrapper : public IceUtil::Shared
+class SessionWrapper : public Ice::LocalObject
 {
 public:
     SessionWrapper(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& session,

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list