[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Sep 10 11:12:09 CDT 2010


branch "master" has been updated
       via  1f827738e764ef4a09b669972bc65f4495373d20 (commit)
       via  9edb8c0f573e555691f058d73767ac0be4797aa3 (commit)
      from  ece0b81c0959c8e901bc28b271a93ad2ae38433c (commit)

Summary of changes:
 CMakeLists.txt            |    2 +-
 cmake                     |    2 +-
 slice                     |    2 +-
 src/BridgeFactoryImpl.cpp |  220 --------------
 src/BridgeFactoryImpl.h   |   55 ----
 src/BridgeImpl.cpp        |  699 +++++++++++++++------------------------------
 src/BridgeImpl.h          |  187 ++++++-------
 src/BridgeListenerMgr.cpp |   37 +++
 src/BridgeListenerMgr.h   |   36 +++
 src/BridgeManagerImpl.cpp |  148 ++++++++++
 src/BridgeManagerImpl.h   |   59 ++++
 src/BridgeServiceImpl.h   |    4 +-
 src/CMakeLists.txt        |    8 +-
 src/InternalExceptions.h  |   45 +++
 src/ListenerManager.h     |  141 +++++++++
 src/Logger.h              |    4 +-
 src/MediaSplicer.cpp      |   75 +++--
 src/MediaSplicer.h        |    7 +-
 src/Service.cpp           |   71 +----
 test/CMakeLists.txt       |    9 +-
 test/TestBridging.cpp     |  119 ++++----
 21 files changed, 919 insertions(+), 1011 deletions(-)
 delete mode 100644 src/BridgeFactoryImpl.cpp
 delete mode 100644 src/BridgeFactoryImpl.h
 create mode 100644 src/BridgeListenerMgr.cpp
 create mode 100644 src/BridgeListenerMgr.h
 create mode 100644 src/BridgeManagerImpl.cpp
 create mode 100644 src/BridgeManagerImpl.h
 create mode 100644 src/InternalExceptions.h
 create mode 100644 src/ListenerManager.h


- Log -----------------------------------------------------------------
commit 1f827738e764ef4a09b669972bc65f4495373d20
Author: Brent Eagles <beagles at digium.com>
Date:   Fri Sep 10 13:34:12 2010 -0230

    More refactorings for UML redesign.
    (Test suite updates underway)

diff --git a/src/BridgeListenerMgr.cpp b/src/BridgeListenerMgr.cpp
new file mode 100644
index 0000000..8aa176e
--- /dev/null
+++ b/src/BridgeListenerMgr.cpp
@@ -0,0 +1,37 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#include "BridgeListenerMgr.h"
+
+AsteriskSCF::BridgeService::BridgeListenerMgr::BridgeListenerMgr(const Ice::CommunicatorPtr& comm,
+        const std::string& name,
+        const AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx& bridgeProxy) :
+    ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>(comm, name),
+    mPrx(bridgeProxy)
+{
+    mPublisher = AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx::uncheckedCast(mTopic->getPublisher());
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+    mPublisher->sessionsAdded(mPrx, sessions);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::sessionsRemoved(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+{
+    mPublisher->sessionsRemoved(mPrx, sessions);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::stopped()
+{
+    mPublisher->stopped(mPrx);
+}
+
+void AsteriskSCF::BridgeService::BridgeListenerMgr::stopping()
+{
+    mPublisher->stopping(mPrx);
+}
diff --git a/src/BridgeListenerMgr.h b/src/BridgeListenerMgr.h
new file mode 100644
index 0000000..5c4b640
--- /dev/null
+++ b/src/BridgeListenerMgr.h
@@ -0,0 +1,36 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
+#pragma once
+#include <string>
+#include <Ice/Ice.h>
+#include "ListenerManager.h"
+#include <SessionCommunications/Bridging/BridgingIf.h>
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    class BridgeListenerMgr : virtual public ListenerManagerT<AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx>
+    {
+    public:
+        BridgeListenerMgr(const Ice::CommunicatorPtr& communicator, const std::string& name,
+                const AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx& source);
+
+        void sessionsAdded(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+        void sessionsRemoved(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions);
+        void stopped();
+        void stopping();
+
+    private:
+        AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx mPrx;
+        AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx mPublisher;
+    };
+
+    typedef IceUtil::Handle<BridgeListenerMgr> BridgeListenerMgrPtr;
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
diff --git a/src/InternalExceptions.h b/src/InternalExceptions.h
new file mode 100644
index 0000000..13c5d43
--- /dev/null
+++ b/src/InternalExceptions.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <exception>
+#include <string>
+#include <sstream>
+
+// TODO: Some exceptions that might be better off in the shared codebase.
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    class ConfigException : public std::exception
+    {
+    public:
+
+        ConfigException(const std::string& propertyName, const std::string& message) 
+        {
+            std::stringstream what;
+            what << propertyName << " configuration error: ";
+            if(message.size() != 0)
+            {
+                what << message;
+            }
+            else
+            {
+                what << "(no message)";
+            }
+            mWhat = what.str();
+        }
+
+        ~ConfigException() throw()
+        {
+        }
+
+        const char* what() const throw()
+        {
+            return mWhat.c_str();
+        }
+
+    private:
+        std::string mWhat;
+    };
+}
+}
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
new file mode 100644
index 0000000..a307fd3
--- /dev/null
+++ b/src/ListenerManager.h
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <string>
+#include <algorithm>
+#include <vector>
+#include "InternalExceptions.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    //
+    // Helper template for classes that need to implement listener style interfaces.
+    //
+    template <class T>
+    class ListenerManagerT : public IceUtil::Shared
+    {
+        typedef std::vector<T> ListenerSeq;
+        typename std::vector<T>::iterator ListenerIter;
+    public:
+        ListenerManagerT(const Ice::CommunicatorPtr& communicator, const std::string& topicName) :
+            mCommunicator(communicator),
+            mTopicName(topicName)
+        {
+            //
+            // TODO: While this is being concocted for a single component, it would make more sense
+            // to have the topic manager passed in during construction for more general usage.
+            //
+            const std::string propertyName = "TopicManager.Proxy";
+            std::string topicManagerProperty = mCommunicator->getProperties()->getProperty(propertyName);
+            if(topicManagerProperty.size() == 0)
+            {
+                throw ConfigException(propertyName, "Topic manager proxy property missing. "
+                        "Unable to initialize listener support.");
+            }
+
+            try
+            {
+                mTopicManager = IceStorm::TopicManagerPrx::checkedCast(mCommunicator->stringToProxy(topicManagerProperty));
+            }
+            catch(const Ice::Exception&)
+            {
+            }
+            if(!mTopicManager)
+            {
+                throw ConfigException(propertyName, "Topic manager proxy is not a valid proxy or is unreachable");
+            }
+
+            try
+            {
+                mTopic = mTopicManager->retrieve(mTopicName);
+            }
+            catch(const IceStorm::NoSuchTopic&)
+            {
+            }
+
+            if(!mTopic)
+            {
+                try
+                {
+                    mTopic = mTopicManager->create(mTopicName);
+                }
+                catch(const IceStorm::TopicExists&)
+                {
+                    //
+                    // In case there is a race condition when creating the topic.
+                    //
+                    mTopic = mTopicManager->retrieve(mTopicName);
+                }
+            }
+
+            if(!mTopic)
+            {
+                throw ConfigException(propertyName,
+                        std::string("unable to create topic with the provided configuration :") + mTopicName);
+            }
+        }
+
+        virtual ~ListenerManagerT()
+        {
+            //
+            // TODO: Destroy topic.
+            //
+        }
+
+        //
+        // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
+        // and whatnot are not flagged.
+        //
+        void addListener(const T& listener)
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+            {
+                mListeners.push_back(listener);
+            }
+            IceStorm::QoS qos;
+            qos["reliability"] = "ordered";
+            try
+            {
+                mTopic->subscribeAndGetPublisher(qos, listener);
+            }
+            catch(const IceStorm::AlreadySubscribed&)
+            {
+                //
+                // This indicates some kind of inconsistent state.
+                //
+            }
+        }
+
+        void removeListener(const T& listener)
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
+            if(i != mListeners.end())
+            {
+                mListeners.erase(i);
+                mTopic->unsubscribe(listener);
+            }
+        }
+
+        std::vector<T> getListeners()
+        {
+            boost::shared_lock<boost::shared_mutex> lock(mLock);
+            std::vector<T> result(mListeners);
+            return result;
+        }
+
+    protected:
+        boost::shared_mutex mLock;
+        Ice::CommunicatorPtr mCommunicator;
+        std::string mTopicName;
+        IceStorm::TopicPrx mTopic;
+        IceStorm::TopicManagerPrx mTopicManager;
+        ListenerSeq mListeners;
+    };
+}
+}

commit 9edb8c0f573e555691f058d73767ac0be4797aa3
Author: Brent Eagles <beagles at digium.com>
Date:   Fri Sep 10 13:26:11 2010 -0230

    Majority of the service refactoring done with.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ac1e98b..534deef 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -10,7 +10,7 @@ include(cmake/Hydra_v4.cmake)
 hydra_project("bridging service" 3.4 CXX)
 
 # Take care of slice definitions
-add_subdirectory(slice EXCLUDE_FROM_ALL)
+add_subdirectory(slice)
 
 # Include the subdirectory information. This is separate so that
 # a higher-level CMakeLists.txt can act as a master for integrated
diff --git a/cmake b/cmake
index 1095aeb..1e7a172 160000
--- a/cmake
+++ b/cmake
@@ -1 +1 @@
-Subproject commit 1095aeb680f1c6c2c411d80657e2c066bf7aa6d8
+Subproject commit 1e7a1725fe9e2d176a2c26820d6bcd96c6c3939e
diff --git a/slice b/slice
index dcb271b..14413db 160000
--- a/slice
+++ b/slice
@@ -1 +1 @@
-Subproject commit dcb271baaca90fa89ed6b7d4846ea458fb303943
+Subproject commit 14413db47bfae3d1ff57d36e80cfe700d755ae0b
diff --git a/src/BridgeFactoryImpl.cpp b/src/BridgeFactoryImpl.cpp
deleted file mode 100644
index e44cd74..0000000
--- a/src/BridgeFactoryImpl.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-#include "BridgeFactoryImpl.h"
-#include <Ice/Ice.h>
-#include <boost/thread/locks.hpp>
-
-namespace Hydra
-{
-namespace BridgeService
-{
-    //
-    // Functor used with for_each on shutdown.
-    //
-    class ShutdownFunctor : public std::unary_function<Hydra::BridgeService::BridgeImplPtr, void>
-    {
-    public:
-        ShutdownFunctor(const Ice::Current& c) :
-            mCurrent(c)
-        {
-        }
-
-        void operator()(Hydra::BridgeService::BridgeImplPtr b) 
-        { 
-            b->shutdown(mCurrent); 
-        }
-
-    private:
-        const Ice::Current mCurrent;
-    };
-
-} // End of namespace BridgeService
-} // End of namespace Hydra
-
-Hydra::BridgeService::BridgeFactoryImpl::BridgeFactoryImpl(Ice::ObjectAdapterPtr adapter, const Hydra::Core::Bridging::V1::BridgeEventsPrx& ev) :
-    mShuttingDown(false),
-    mSuspended(false),
-    mAdapter(adapter),
-    mBridgeEvents(ev)
-{
-    mLogger.getInfoStream() << "Created Hydra Session-Oriented Bridge Factory." << std::endl; 
-}
-
-Hydra::Core::Bridging::V1::BridgePrx Hydra::BridgeService::BridgeFactoryImpl::createBridge(
-  const Hydra::Core::Endpoint::V1::BaseEndpointPtr& ep,
-  const Hydra::Core::Endpoint::V1::EndpointSeq& endpoints,
-  const Hydra::Core::Bridging::V1::BridgeMonitorPrx& mgr,
-  const Ice::Current& current)
-{
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    //
-    // Verify the endpoints are of the correct type before doing anything else.
-    //
-    mLogger.getDebugStream() << __FUNCTION__ << ": validating endpoints." << std::endl;
-    Hydra::Session::V1::SessionEndpointPtr adminEp(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(ep));
-    if(ep != 0)
-    {
-        if(!adminEp)
-        {
-            mLogger.getDebugStream() << __FUNCTION__ << ": casting admin endpoint resulted in null value." << std::endl;
-            throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(ep->id);
-        }
-        if(!checkEndpointId(*ep->id))
-        {
-            mLogger.getDebugStream() << __FUNCTION__ << ": admin endpoint had invalid id." << std::endl;
-            throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep->id);
-        }
-    }
-
-    Hydra::Session::V1::SessionEndpointSeq eps;
-    mLogger.getDebugStream() << __FUNCTION__ << ": scanning " << eps.size() << " endpoints." << std::endl;
-    for(Hydra::Core::Endpoint::V1::EndpointSeq::const_iterator i = endpoints.begin(); i != endpoints.end(); ++i)
-    {
-        if(*i == 0)
-        {
-            mLogger.getDebugStream() << __FUNCTION__ << ": null endpoint encountered." << std::endl;
-            throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(0);
-        }
-        Hydra::Session::V1::SessionEndpointPtr t(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(*i));
-        if(!t)
-        {
-            mLogger.getDebugStream() << __FUNCTION__ << ": casting endpoint resulted in null value." << std::endl;
-            throw Hydra::Core::Bridging::V1::UnsupportedEndpoint((*i)->id);
-        }
-        if(!t->id || !checkEndpointId(*t->id))
-        {
-            if(mLogger.debugTracing())
-            {
-                if(!t->id)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": endpoint has null id." << std::endl;
-                }
-                else
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": id (" << t->id->endpointManagerId << ":" << t->id->destinationId << ") is invalid." << std::endl;
-                }
-            }
-            throw Hydra::Core::Endpoint::V1::InvalidEndpointId(t->id);
-        }
-        if(adminEp != 0 && adminEp->id != 0)
-        {
-            if(adminEp->id->endpointManagerId == t->id->endpointManagerId && adminEp->id->destinationId == t->id->destinationId)
-            {
-                mLogger.getDebugStream() << __FUNCTION__ << ": endpoint id collision between admin endpoint and initial endpoint set for id " <<
-                    adminEp->id->endpointManagerId << ":" << adminEp->id->destinationId << std::endl;
-                throw Hydra::Core::Bridging::V1::EndpointCollision(adminEp->id, t->id); 
-            }
-        }
-        //
-        // Kind of icky but necessary. The bridge cannot an endpoint registered more than twice.
-        //
-        for(Hydra::Session::V1::SessionEndpointSeq::const_iterator j = eps.begin(); j != eps.end(); ++j)
-        {
-            if((*j)->id->endpointManagerId == t->id->endpointManagerId && (*j)->id->destinationId == t->id->destinationId)
-            {
-                mLogger.getDebugStream() << __FUNCTION__ << ": endpoint id collision between endpoints in initial endpoint set " <<
-                    t->id->endpointManagerId << ":" << t->id->destinationId << std::endl;
-                throw Hydra::Core::Bridging::V1::EndpointCollision((*j)->id, t->id); 
-            }
-        }
-        eps.push_back(t);
-    }
-    mLogger.getDebugStream() << __FUNCTION__ << ": endpoints have been successfully validated." << std::endl;
-
-    //
-    // It is arguable that it might be better to do the shutting down check before going through all the casting work. 
-    // This order has been chosen as the work above is actually an argument validation check that will be required
-    // for all running bridges and it was felt that it was better to arrange things so the lock was not obtained
-    // under normal running conditions for the check. The alternative would be to do two locks, one to check the 
-    // shutdown state and then anothert to modify the state.. but that that is probably more costly still.
-    //
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mShuttingDown)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-        throw Hydra::System::Component::V1::ShuttingDown();
-    }
-    if(mSuspended)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
-        throw Hydra::System::Component::V1::Suspended();
-    }
-    reap();
-
-    Hydra::BridgeService::BridgeImplPtr bridge = new Hydra::BridgeService::BridgeImpl(mAdapter, adminEp, eps, mgr, mBridgeEvents);
-    Ice::ObjectPrx obj = mAdapter->addWithUUID(bridge);
-    mAdapter->addFacet(bridge->signalCB(), obj->ice_getIdentity(), "SignalCallback");
-    mBridges.push_back(bridge);
-    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": creating new bridge " << obj->ice_toString() << "." << std::endl;
-    return Hydra::Core::Bridging::V1::BridgePrx::uncheckedCast(obj);
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::shutdown(const Ice::Current& current)
-{
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mShuttingDown)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-        return;
-    }
-    if(mSuspended)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
-        throw Hydra::System::Component::V1::Suspended();
-    }
-    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": shutting down." << std::endl;
-    mShuttingDown = true;
-    reap();
-    std::for_each(mBridges.begin(), mBridges.end(), Hydra::BridgeService::ShutdownFunctor(current));
-
-    mAdapter->getCommunicator()->shutdown();
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::suspend(const Ice::Current& current)
-{
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mShuttingDown)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-        throw Hydra::System::Component::V1::ShuttingDown();
-    }
-    if(mSuspended)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
-        return;
-    }
-    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": suspending." << std::endl;
-    mSuspended = true;
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::resume(const Ice::Current& current)
-{
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mShuttingDown)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-        throw Hydra::System::Component::V1::ShuttingDown();
-    }
-    if(!mSuspended)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
-        return;
-    }
-    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": resuming." << std::endl;
-    mSuspended = false;
-}
-
-void Hydra::BridgeService::BridgeFactoryImpl::reap()
-{
-    mLogger.getDebugStream() << __FUNCTION__ << ": reaping bridge set of " << mBridges.size() << " bridges." << std::endl;
-    for(std::vector<Hydra::BridgeService::BridgeImplPtr>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
-    {
-        if((*i)->destroyed())
-        {
-            std::vector<Hydra::BridgeService::BridgeImplPtr>::iterator t = i;
-            mBridges.erase(t);
-        }
-    }
-    mLogger.getDebugStream() << __FUNCTION__ << ": reaping completed, bridge set size is now " << mBridges.size() << "." << std::endl;
-}
diff --git a/src/BridgeFactoryImpl.h b/src/BridgeFactoryImpl.h
deleted file mode 100644
index eeb2dea..0000000
--- a/src/BridgeFactoryImpl.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#pragma once
-#ifndef __HYDRA_BRIDGE_FACTORY_IMPL_H
-#define __HYDRA_BRIDGE_FACTORY_IMPL_H
-
-#include <Core/Bridging/BridgeServiceIf.h>
-#include <Core/Bridging/BridgeServiceEventsIf.h>
-#include <boost/thread/shared_mutex.hpp>
-#include <vector>
-
-#include "BridgeImpl.h"
-#include "Logger.h"
-
-namespace Hydra
-{
-   namespace BridgeService
-{
-   class BridgeFactoryImpl : public Core::Bridging::V1::BridgeFactory
-   {
-   public:
-
-      BridgeFactoryImpl(Ice::ObjectAdapterPtr adapter, const Core::Bridging::V1::BridgeEventsPrx& events);
-
-      //
-      // Hydra::Core::Bridging::V1::BridgeFactory Interface
-      //
-      Core::Bridging::V1::BridgePrx createBridge(
-        const Core::Endpoint::V1::BaseEndpointPtr& ep,
-        const Core::Endpoint::V1::EndpointSeq& endpoints,
-        const Core::Bridging::V1::BridgeMonitorPrx& monitor,
-        const Ice::Current& current);
-
-      //
-      // Hydra::System::Component::V1::ComponentService Interface.
-      //
-      void suspend(const Ice::Current& current);
-      void resume(const Ice::Current& current);
-      void shutdown(const Ice::Current& current);
-
-   private:
-
-      boost::shared_mutex mLock;
-      std::vector<BridgeImplPtr> mBridges;
-      bool mShuttingDown;
-      bool mSuspended;
-      Ice::ObjectAdapterPtr mAdapter;
-      Logger mLogger;
-      Core::Bridging::V1::BridgeEventsPrx mBridgeEvents;
-
-      void reap();
-   };
-
-   typedef IceUtil::Handle<BridgeFactoryImpl> BridgeFactoryImplPtr;
-};
-};
-#endif
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index fe66533..ee1106a 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1,451 +1,327 @@
 #include "BridgeImpl.h"
 #include <System/Component/ComponentServiceIf.h>
+#include <SessionCommunications/SessionCommunicationsIf.h>
 #include <Ice/Ice.h>
 #include <memory>
+#include <algorithm>
 #include <boost/thread/locks.hpp>
+
+//
+// Compiled in constants.
+// TODO: Replace with configuration!
+//
+
+static const std::string TopicPrefix("AsteriskSCF.Bridge.");
+
 //
 // TODO:
-// Operations that are performed on all bridge endpoints might be better done as AMI requests.
+// Operations that are performed on all bridge sessions might be better done as AMI requests.
 // 
-namespace Hydra
+namespace AsteriskSCF
 {
 namespace BridgeService
 {
-    class EventTopicWrapper
+    //
+    // Functor to support using for_each on shutdown.
+    //
+    class ShutdownImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
     public:
-        EventTopicWrapper(const Core::Bridging::V1::BridgeEventsPrx& ev) :
-            mEvents(ev)
-        {
-        }
-
-        void endpointAdded(const Core::Endpoint::V1::BaseEndpointPtr& ep)
-        {
-            if(mEvents)
-            {
-                try
-                {
-                    mEvents->endpointAdded(ep);
-                }
-                catch(const Ice::Exception& ex)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " <<  ex << std::endl;
-                }
-            }
-        }
-
-        void endpointRemoved(const Core::Endpoint::V1::BaseEndpointPtr& ep)
-        {
-            if(mEvents)
-            {
-                try
-                {
-                    mEvents->endpointRemoved(ep);
-                }
-                catch(const Ice::Exception& ex)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " <<  ex << std::endl;
-                }
-            }
-        }
-
-        void shuttingDown()
+        ShutdownImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
+            mResponse(response)
         {
-            if(mEvents)
-            {
-                try
-                {
-                    mEvents->shuttingDown();
-                }
-                catch(const Ice::Exception& ex)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " <<  ex << std::endl;
-                }
-            }
         }
 
-        void stopped()
+        void operator()(const BridgeImpl::BridgeSession& b) 
         {
-            if(mEvents)
+            b.session->stop(mResponse);
+            if(b.connector)
             {
-                try
-                {
-                    mEvents->stopped();
-                }
-                catch(const Ice::Exception& ex)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": publishing failed with " <<  ex << std::endl;
-                }
+                b.connector->unplug();
             }
         }
-
     private:
-        Core::Bridging::V1::BridgeEventsPrx mEvents;
-        Logger mLogger;
+        AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
     };
-
-    //
-    // Functor to support using for_each on shutdown.
-    //
-    class BridgeShutdownFunctor : public std::unary_function<BridgeImpl::BridgeEndpoint, void>
+    
+    class ProgressingImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
     public:
-        BridgeShutdownFunctor(const Core::Endpoint::V1::EndpointIdPtr& id) :
-            mId(id)
+        ProgressingImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
+            mResponse(response)
         {
         }
 
-        void operator()(const BridgeImpl::BridgeEndpoint& b) 
-        { 
-            if(b.endpoint)
-            {
-                Hydra::Session::V1::SessionEndpointPtr p(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(b.endpoint));
-                assert(p != 0);
-                if(p->callback)
-                {
-                    p->command->terminate(mId);
-                }
-                if(b.connector)
-                {
-                    b.connector->unplug();
-                }
-            }
+        void operator()(const BridgeImpl::BridgeSession& b) 
+        {
+            b.session->progress(mResponse);
         }
     private:
-        Core::Endpoint::V1::EndpointIdPtr mId;
+        AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
     };
 
-    //
-    // Functor to support broadcasting busy notifications.
-    //
-    struct BusyImpl : public std::binary_function<Hydra::Core::Endpoint::V1::EndpointIdPtr, Hydra::Session::V1::SignalCallbackPrx, void>
+    struct RingImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
-        void operator()(const Hydra::Core::Endpoint::V1::EndpointIdPtr source, const Hydra::Session::V1::SignalCallbackPrx& p)
-        { 
-            p->busy(source);
+        void operator()(const BridgeImpl::BridgeSession& b) 
+        {
+            b.session->ring();
         }
     };
 
-    struct RingImpl : public std::binary_function<Hydra::Core::Endpoint::V1::EndpointIdPtr, Hydra::Session::V1::SignalCallbackPrx, void>
+    struct FlashImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
-        void operator()(const Hydra::Core::Endpoint::V1::EndpointIdPtr source, const Hydra::Session::V1::SignalCallbackPrx& p)
-        { 
-            p->ring(source);
+        void operator()(const BridgeImpl::BridgeSession& b) 
+        {
+            b.session->flash();
         }
     };
-
-    class BroadcasterBase : public std::unary_function<BridgeImpl::BridgeEndpoint, void>
+    
+    struct HoldImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
-    public:
-        BroadcasterBase(const Core::Endpoint::V1::EndpointIdPtr& id) :
-            mId(id) 
-        {
-        }
-
-        void operator()(const BridgeImpl::BridgeEndpoint& b) 
-        { 
-            if(b.endpoint)
-            {
-                Hydra::Session::V1::SessionEndpointPtr p(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(b.endpoint));
-                assert(p != 0);
-                if(p->callback)
-                {
-                    doOp(p->callback);
-                }
-            }
-
-        }
-    protected:
-        Core::Endpoint::V1::EndpointIdPtr mId;
-
-        virtual void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+        void operator()(const BridgeImpl::BridgeSession& b) 
         {
+            b.session->hold();
         }
     };
-
-    template <class F>
-        class Broadcaster : public BroadcasterBase
+    
+    struct UnholdImpl : public std::unary_function<BridgeImpl::BridgeSession, void>
     {
-    public:
-        Broadcaster(const Core::Endpoint::V1::EndpointIdPtr& id) :
-            BroadcasterBase(id) 
-        {
-        }
-    protected:
-        void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+        void operator()(const BridgeImpl::BridgeSession& b) 
         {
-            F f;
-            f(mId, p);
+            b.session->unhold();
         }
     };
 
-    template <class F>
-    class BroadcasterWithResponseCode : public BroadcasterBase
+    class FindImpl : public std::unary_function<BridgeImpl::BridgeSession, bool>
     {
     public:
-        BroadcasterWithResponseCode(const Core::Endpoint::V1::EndpointIdPtr& id, const Session::V1::ResponseCodePtr& responseCode) : 
-            BroadcasterBase(id),
-            mResponseCode(responseCode)
+        FindImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx) :
+            mPrx(prx)
         {
         }
-
-    protected:
-        Session::V1::ResponseCodePtr mResponseCode;
-        void doOp(const Hydra::Session::V1::SignalCallbackPrx& p)
+        
+        bool operator()(const BridgeImpl::BridgeSession& b)
         {
-            F f;
-            f(mId, p, mResponseCode);
+            return b.session == mPrx;
         }
+    private:
+        AsteriskSCF::SessionCommunications::V1::SessionPrx mPrx;
     };
 
-    class DefaultBridgeSignallingCallback : public Session::V1::SignalCallback
+
+    //
+    // For events that require modification to the bridge, we use helper methods on the bridge itself.
+    // For events result in distribution to the bridge sessions, we copy the current sessions and
+    // run the calls from the listener itself.
+    //
+    class SessionListener : public AsteriskSCF::SessionCommunications::V1::SessionListener
     {
     public:
-
-        DefaultBridgeSignallingCallback(const BridgeImplPtr& bridge) :
-            mBridge(bridge)
-        {
-        }
-
-        void ring(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+        SessionListener(const BridgeImplPtr& b) :
+            mBridge(b)
         {
-            mBridge->endpointRinging(p);
-        }
-
-        void connected(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
-        {
-            //
-            // It's here where the actual connection of then endpoint needs to be setup.
-            //
-            mBridge->endpointConnected(p);
         }
 
-        void terminated(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current& current)
+        void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
         {
-            if(mBridge->endpointTerminated(p, r, current) == 1)
-            {
-                mBridge->shutdown(current);
-            }
+            mBridge->sessionConnected(source);
         }
 
-        void busy(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+        void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
         {
-            mBridge->endpointBusy(p);
         }
 
-        void congestion(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current&)
+        void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
         {
         }
 
-        void hold(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+        void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
         {
         }
 
-        void unhold(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+        void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
         {
+            std::vector<BridgeImpl::BridgeSession> sessions(mBridge->currentSessions());
+            std::for_each(sessions.begin(), sessions.end(), RingImpl());
         }
 
-        void flash(const Core::Endpoint::V1::EndpointIdPtr& p, const Ice::Current&)
+        void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current& current)
         {
+            size_t endpointCount = mBridge->sessionStopped(source, response);
+            if(endpointCount < 2)
+            {
+                mBridge->shutdown(current);
+            }
         }
 
-        void progress(const Core::Endpoint::V1::EndpointIdPtr& p, const Session::V1::ResponseCodePtr& r, const Ice::Current&)
+        void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
         {
         }
 
-
     private:
         BridgeImplPtr mBridge;
     };
-
+    
 } // End of namespace BridgeService
-} // End of namespace Hydra
-
-static std::string endpointIdToString(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
-    return id->endpointManagerId + ":" + id->destinationId;
-}
+} // End of namespace AsteriskSCF
 
-Hydra::BridgeService::BridgeImpl::BridgeImpl(
+AsteriskSCF::BridgeService::BridgeImpl::BridgeImpl(
   const Ice::ObjectAdapterPtr& adapter,
-  const Hydra::Session::V1::SessionEndpointPtr& adminEp,
-  const Hydra::Session::V1::SessionEndpointSeq& initialEndpoints,
-  const Hydra::Core::Bridging::V1::BridgeMonitorPrx& manager,
-  const Hydra::Core::Bridging::V1::BridgeEventsPrx& ev
+  const AsteriskSCF::SessionCommunications::V1::SessionSeq& initialSessions,
+  const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& ev,
+  const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr
   ) :
     mState(Running),
-    mMonitor(manager),
-    mEvents(new Hydra::BridgeService::EventTopicWrapper(ev)),
-    mAdminEndpointInList(true),
-    mBridgeId(new Hydra::Core::Endpoint::V1::EndpointId),
-    mObjAdapter(adapter)
+    mObjAdapter(adapter),
+    mListeners(listenerMgr),
+    mSessionListener(new SessionListener(this))
 {
-    if(adminEp)
-    {
-        mAdminEndpoint = std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>(new Hydra::BridgeService::BridgeImpl::BridgeEndpoint(adminEp, 0));
-        mAdminEndpoint->connector = mSplicer.connect(endpointIdToString(adminEp->id), adminEp->mediaSession);
-    }
-    for(Hydra::Session::V1::SessionEndpointSeq::const_iterator i = initialEndpoints.begin(); i != initialEndpoints.end(); ++i)
-    {
-        mQueuedEndpoints.insert(std::pair<std::string, Hydra::Session::V1::SessionEndpointPtr>(endpointIdToString((*i)->id), *i));
-    }
-    mSignallingServant = new DefaultBridgeSignallingCallback(this);
-    mSignalling = Hydra::Session::V1::SignalCallbackPrx::uncheckedCast(mObjAdapter->addWithUUID(mSignallingServant));
-    for(Hydra::Session::V1::SessionEndpointSeq::const_iterator i = initialEndpoints.begin(); i != initialEndpoints.end(); ++i)
+    mListeners->addListener(ev);
+    mSessionListenerPrx = AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(mObjAdapter->addWithUUID(mSessionListener));
+
+    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = initialSessions.begin();
+        i != initialSessions.end(); ++i)
     {
-        (*i)->command->call(mBridgeId, (*i)->id, mSignalling);
+        AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info = (*i)->addListener(mSessionListenerPrx);
+        //
+        // We need to define these states! Especially the ones that define when start is called or not.
+        //
+        if(info->currentState == "ready")
+        {
+            (*i)->start();
+            mSessions.push_back(BridgeSession(*i, 0));
+        }
+        else
+        {
+            mSessions.push_back(BridgeSession(*i, mSplicer.connect(*i)));
+        }
     }
+    mListeners->sessionsAdded(initialSessions);
 }
 
-Hydra::BridgeService::BridgeImpl::~BridgeImpl()
+AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
 {
     try
     {
-        mObjAdapter->remove(mSignalling->ice_getIdentity());
+        //
+        // We don't want to remove all listener activity unless we are definitively destroyed. Any other
+        // situations where the bridge servant might be released from memory might be a temporary condition
+        // where we may want events to hold until some kind of timeout causes the listener to be removed.
+        //
+        if(mState == Destroyed)
+        {
+            mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+        }
     }
     catch(...)
     {
     }
 }
 
-void Hydra::BridgeService::BridgeImpl::addEndpoint(const Hydra::Core::Endpoint::V1::BaseEndpointPtr& ep, const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
 {
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    if(ep == 0 || ep->id == 0)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": attempting to null endpoint or endpoint with invalid id." << std::endl;
-        throw Hydra::Core::Endpoint::V1::InvalidEndpointId(new Hydra::Core::Endpoint::V1::EndpointId);
-    }
-
-    if(!checkEndpointId(*ep->id))
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": attempting to add endpoint with invalid id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << "."  << std::endl;
-        throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep->id);
-    }
-
-    Hydra::Session::V1::SessionEndpointPtr newEndpoint(Hydra::Session::V1::SessionEndpointPtr::dynamicCast(ep));
-    if(!newEndpoint)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": endpoint with id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << " failed dynamic cast." << std::endl;
-        throw Hydra::Core::Bridging::V1::UnsupportedEndpoint(ep->id);
-    }
+    AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         statePreCheck();
-        std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator  i = find(ep->id);
-        if(i != mEndpoints.end())
+        for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
+            i != sessions.end(); ++i)
         {
-            mLogger.getDebugStream() << __FUNCTION__ << ": endpoint with id " << ep->id->endpointManagerId << ":" << ep->id->destinationId << " already registered." << std::endl;
-            throw Hydra::Core::Bridging::V1::EndpointAlreadyRegistered(ep->id);
-        }
-
-        if(mMonitor)
-        {
-            bool result = mMonitor->onAddEndpoint(ep);
-            mLogger.getDebugStream() << __FUNCTION__ << "onAddEndpoint() returned " << result << std::endl;
-        }
+            //
+            // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
+            // impossible to guard against race conditions where multiple call paths might want to add a session
+            // more than once for some reason. We should probably just log it and move on.
+            //
+            std::vector<BridgeSession>::iterator j = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
+            if(j != mSessions.end())
+            {
+                continue;
+            }
 
-        if(newEndpoint->command)
-        {
-            mQueuedEndpoints.insert(std::pair<std::string, Hydra::Session::V1::SessionEndpointPtr>(endpointIdToString(newEndpoint->id), newEndpoint));
-        }
-        else
-        {
-            mEndpoints.push_back(BridgeEndpoint(newEndpoint, mSplicer.connect(endpointIdToString(newEndpoint->id), newEndpoint->mediaSession)));
+            AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info = (*i)->addListener(mSessionListenerPrx);
+            //
+            // We need to define these states! Especially the ones that define when start is called or not.
+            //
+            if(info->currentState == "ready")
+            {
+                (*i)->start();
+                mSessions.push_back(BridgeSession(*i, 0));
+            }
+            else
+            {
+                mSessions.push_back(BridgeSession(*i, mSplicer.connect(*i)));;
+            }
+                
+            addedSessions.push_back(*i);
         }
     }
-    //
-    // Don't perform the RPC with the lock held!
-    //
-    if(newEndpoint->command)
+    if(addedSessions.size())
     {
-        try
-        {
-            newEndpoint->command->call(mBridgeId, newEndpoint->id, mSignalling);
-        }
-        catch(...)
-        {
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            mQueuedEndpoints.erase(endpointIdToString(newEndpoint->id));
-            throw;
-        }
+        mListeners->sessionsAdded(addedSessions);
     }
-
-    mLogger.getDebugStream() << __FUNCTION__ << ": bridge " << current.adapter->getCommunicator()->identityToString(current.id) <<
-         " now has " << mEndpoints.size() << " endpoints." << std::endl;
-    mEvents->endpointAdded(newEndpoint);
 }
 
-void Hydra::BridgeService::BridgeImpl::removeEndpoint(const Hydra::Core::Endpoint::V1::EndpointIdPtr& ep, const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
 {
-    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
-    statePreCheck();
-    if(ep == 0)
-    {
-        throw Hydra::Core::Endpoint::V1::InvalidEndpointId(0);
-    }
-    if(!checkEndpointId(*ep))
+    AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
     {
-        throw Hydra::Core::Endpoint::V1::InvalidEndpointId(ep);
-    }
-    std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator  i = find(ep);
-    if(i == mEndpoints.end())
-    {
-        throw Hydra::Core::Bridging::V1::UnknownEndpoint(ep);
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        statePreCheck();
+        for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
+        {
+            std::vector<BridgeSession>::iterator j = std::find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
+            if(j != mSessions.end())
+            {
+                j->session->removeListener(mSessionListenerPrx);
+                j->connector->unplug();
+                mSessions.erase(j);
+                removedSessions.push_back(*i);
+            }
+            else
+            {
+                //
+                // TODO: how do we want to handle sessions that aren't there. Its pretty much impossible to guard against race conditions where
+                // an expected session may no longer be present (device hung up, etc). In other words, this should probably be expected and maybe
+                // just logged.
+                //
+            }
+        }
     }
 
-    if(mMonitor)
+    if(removedSessions.size() != 0)
     {
-        bool result =  mMonitor->onRemoveEndpoint(i->endpoint);
-        mLogger.getDebugStream() << __FUNCTION__ << "onRemoveEndpoint() returned " << result << std::endl;
+        mListeners->sessionsRemoved(removedSessions);
     }
-
-    mEvents->endpointRemoved(i->endpoint);
-    if(i->endpoint->command)
+    //
+    // TODO: Should be policy driven.
+    //
+    if(mSessions.size() < 2)
     {
-        i->endpoint->command->terminate(mBridgeId);
+        shutdown(current);
     }
-    i->connector->unplug();
-    mEndpoints.erase(i);
-
-    mLogger.getDebugStream() << __FUNCTION__ << ": bridge " << current.adapter->getCommunicator()->identityToString(current.id) <<
-         " now has " << mEndpoints.size() << " endpoints." << std::endl;
 }
 
-Hydra::Core::Endpoint::V1::EndpointSeq Hydra::BridgeService::BridgeImpl::listEndpoints(const Ice::Current& current)
+AsteriskSCF::SessionCommunications::V1::SessionSeq AsteriskSCF::BridgeService::BridgeImpl::listSessions(const Ice::Current& current)
 {
     mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
     boost::shared_lock<boost::shared_mutex> lock(mLock);
     statePreCheck();
-    mLogger.getDebugStream() << __FUNCTION__ << ": working with " << mEndpoints.size() << " endpoints." << std::endl;
-
-    Hydra::Core::Endpoint::V1::EndpointSeq eps;
-    if(mAdminEndpointInList && mAdminEndpoint.get() != 0)
-    {
-        mLogger.getDebugStream() << __FUNCTION__ << ": adding admin endpoint to result set." << std::endl;
-        eps.push_back(mAdminEndpoint->endpoint);
-    }
-    for(std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::const_iterator i = mEndpoints.begin(); i != mEndpoints.end(); ++i)
-    {
-        eps.push_back(i->endpoint);
-    }
-    if(mMonitor)
+    mLogger.getDebugStream() << __FUNCTION__ << ": working with " << mSessions.size() << " sessions." << std::endl;
+    AsteriskSCF::SessionCommunications::V1::SessionSeq result;
+    for(std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSession>::const_iterator i = mSessions.begin(); i != mSessions.end(); ++i)
     {
-        bool result = mMonitor->onListEndpoints(eps, eps);
-        mLogger.getDebugStream() << __FUNCTION__ << "onListEndpoints() returned " << result << std::endl;
+        result.push_back(i->session);
     }
-    mLogger.getDebugStream() << __FUNCTION__ << ": returning " << eps.size() << " endpoints." << std::endl;
-    return eps;
+    return result;
 }
 
-void Hydra::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
 {
     //
     // When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
     // no other internal locks. 
     //
-    std::vector<BridgeEndpoint> copyOfEndpoints;
-    Hydra::Session::V1::SignalCommandsPrx adminCmd;
+    std::vector<BridgeSession> copyOfSessions;
 
     mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
     {
@@ -461,49 +337,33 @@ void Hydra::BridgeService::BridgeImpl::shutdown(const Ice::Current& current)
             throw Ice::ObjectNotExistException(__FILE__, __LINE__);
         }
         mState = ShuttingDown;
-        std::swap(copyOfEndpoints, mEndpoints);
-        if(mAdminEndpoint.get() != 0 && mAdminEndpoint->endpoint->command)
-        {
-            adminCmd = mAdminEndpoint->endpoint->command;
-        }
+        std::swap(copyOfSessions, mSessions);
     }
-    if(mMonitor)
-    {
-        bool result = mMonitor->onShutdown();
-        mLogger.getDebugStream() << __FUNCTION__ << "onShuttingDown() returned " << result << std::endl;
-    }
-    mEvents->shuttingDown();
+    mListeners->stopping();
 
     //
     // TODO: Response code for termination messages for bridges shutting down should come from configuration
     //
-    std::for_each(copyOfEndpoints.begin(), copyOfEndpoints.end(), Hydra::BridgeService::BridgeShutdownFunctor(mBridgeId));
-    try
-    {
-        adminCmd->terminate(mBridgeId);
-    }
-    catch(const Ice::Exception&)
-    {
-        //
-        // Swallow this. The admin endpoint going away might be the cause for the shutdown, in which case an exception from a terminate request would
-        // be expected.
-        //
-    }
-
-    //
-    // Now call terminate on the bridge endpoint.
-    //
+    std::for_each(copyOfSessions.begin(), copyOfSessions.end(),
+            AsteriskSCF::BridgeService::ShutdownImpl(new AsteriskSCF::SessionCommunications::V1::ResponseCode));
 
     mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": is shutdown." << std::endl;
-    mEvents->stopped();
-    if(mMonitor)
-    {
-        bool result = mMonitor->onShutdownComplete();
-        mLogger.getDebugStream() << __FUNCTION__ << "onShutdownComplete() returned " << result << std::endl;
-    }
+    mListeners->stopped();
 }
 
-void Hydra::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
+void AsteriskSCF::BridgeService::BridgeImpl::addListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+        const Ice::Current&)
+{
+    mListeners->addListener(listener);
+}
+
+void AsteriskSCF::BridgeService::BridgeImpl::removeListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+        const Ice::Current&)
+{
+    mListeners->removeListener(listener);
+}
+
+void AsteriskSCF::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
 {
     {
         mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
@@ -511,148 +371,68 @@ void Hydra::BridgeService::BridgeImpl::destroy(const Ice::Current& current)
         if(mState == ShuttingDown)
         {
             mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-            throw Hydra::System::Component::V1::ShuttingDown();
+            throw AsteriskSCF::System::Component::V1::ShuttingDown();
         }
         if(mState == Destroyed)
         {
             mLogger.getDebugStream() << __FUNCTION__ << ": called when destroyed." << std::endl;
             throw Ice::ObjectNotExistException(__FILE__, __LINE__);
         }
-        if(mMonitor)
-        {
-            bool result = mMonitor->onDestroy();
-            mLogger.getDebugStream() << __FUNCTION__ << "onDestroy() returned " << result << std::endl;
-        }
         mState = Destroyed;
         mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": is now destroyed." << std::endl;
-        mEvents->stopped();
+        mListeners->stopped();
     }
+    
     //
     // Last act is to remove the servant from the map.
     //
     mObjAdapter->remove(current.id);
 }
 
-bool Hydra::BridgeService::BridgeImpl::destroyed()
+bool AsteriskSCF::BridgeService::BridgeImpl::destroyed()
 {
     boost::shared_lock<boost::shared_mutex> lock(mLock);
     mLogger.getDebugStream() << __FUNCTION__ << ": " << (mState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") << std::endl;
     return mState == Destroyed;
 }
 
-void Hydra::BridgeService::BridgeImpl::endpointConnected(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
+void AsteriskSCF::BridgeService::BridgeImpl::sessionConnected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
 {
-    mLogger.getDebugStream() << __FUNCTION__ << ": endpoint connected " << endpointIdToString(id) << std::endl;
+    mLogger.getDebugStream() << __FUNCTION__ << ": session connected " << session->ice_toString() << std::endl;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    std::map<std::string, Hydra::Session::V1::SessionEndpointPtr>::iterator i = mQueuedEndpoints.find(endpointIdToString(id));
-    if(i != mQueuedEndpoints.end())
+    std::vector<BridgeSession>::iterator i = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
+    if(i != mSessions.end())
     {
-        mEndpoints.push_back(BridgeEndpoint(i->second, mSplicer.connect(endpointIdToString(id), i->second->mediaSession)));
-        mQueuedEndpoints.erase(i);
-    }
-    if(mAdminEndpoint.get() != 0 && mAdminEndpoint->endpoint && mAdminEndpoint->endpoint->callback)
-    {
-        mAdminEndpoint->endpoint->callback->connected(id);
+        i->connector = mSplicer.connect(session);
     }
 }
 
-size_t Hydra::BridgeService::BridgeImpl::endpointTerminated(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id, 
-  const Hydra::Session::V1::ResponseCodePtr& response,
-  const Ice::Current& current)
+size_t AsteriskSCF::BridgeService::BridgeImpl::sessionStopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session, 
+        const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response)
 {
-    mLogger.getDebugStream() << __FUNCTION__ << ": endpoint terminated from " << endpointIdToString(id) << std::endl;
-    size_t endpointCount(0);
-
-    Hydra::Session::V1::SignalCallbackPrx callback;
+    mLogger.getDebugStream() << __FUNCTION__ << ": session terminated from " << session->ice_toString() << std::endl;
+    session->removeListener(mSessionListenerPrx);
 
-    //
-    // Find out if we have an endpoint we need to "unplug"
-    //
-    std::auto_ptr<BridgeEndpoint> ep;
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    std::vector<BridgeSession>::iterator i = find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(session));
+    if(i != mSessions.end())
     {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        ep = removeEndpointImpl(id);
-        endpointCount = mEndpoints.size();
-        if(ep.get() == 0)
-        {
-            mLogger.getDebugStream() << __FUNCTION__ << ": unable to locate terminated endpoint in list of known endpoints! Checking for match on admin endpoint" << std::endl;
-            if(mAdminEndpoint.get() != 0)
-            {
-                if(id->endpointManagerId == mAdminEndpoint->endpoint->id->endpointManagerId && id->destinationId == mAdminEndpoint->endpoint->id->destinationId)
-                {
-                    mLogger.getDebugStream() << __FUNCTION__ << ": terminated endpoint is the admin endpoint. Resetting admin endpoint!" << std::endl;
-                    ep = mAdminEndpoint;
-                    mAdminEndpoint.reset(0);
-                }
-            }
-        }
-        if(mAdminEndpoint.get() != 0)
+        if(i->connector)
         {
-            callback = mAdminEndpoint->endpoint->callback;
-            ++endpointCount;
+            i->connector->unplug();
         }
+        mSessions.erase(i);
     }
 
-    if(callback)
-    {
-        callback->terminated(id, response);
-    }
-
-    //
-    // This endpoint is now "out there" and not known to any other part of the bridge. Disconnecting the media is the last thing we can do.
-    //
-    if(ep.get() != 0 && ep->connector)
-    {
-        try
-        {
-            ep->connector->unplug();
-        }
-        catch(const Ice::Exception&)
-        {
-        }
-    }
-    return endpointCount;
+    return mSessions.size();
 }
 
-void Hydra::BridgeService::BridgeImpl::endpointRinging(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
-    mLogger.getDebugStream() << __FUNCTION__ << ": endpoint ringing from " << endpointIdToString(id) << std::endl;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mAdminEndpoint.get() != 0)
-    {
-        mAdminEndpoint->endpoint->callback->ring(id);
-    }
-    else
-    {
-        std::for_each(mEndpoints.begin(), mEndpoints.end(), Hydra::BridgeService::Broadcaster<Hydra::BridgeService::RingImpl>(id));
-    }
-}
-
-void Hydra::BridgeService::BridgeImpl::endpointBusy(const Hydra::Core::Endpoint::V1::EndpointIdPtr& id)
-{
-    mLogger.getDebugStream() << __FUNCTION__ << ": busy callback from " << endpointIdToString(id) << std::endl;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if(mAdminEndpoint.get() != 0)
-    {
-        mAdminEndpoint->endpoint->callback->busy(id);
-    }
-    else
-    {
-        std::for_each(mEndpoints.begin(), mEndpoints.end(), Hydra::BridgeService::Broadcaster<Hydra::BridgeService::BusyImpl>(id));
-    }
-}
-
-Hydra::Session::V1::SignalCallbackPtr Hydra::BridgeService::BridgeImpl::signalCB()
-{
-    return mSignallingServant;
-}
-
-void Hydra::BridgeService::BridgeImpl::statePreCheck()
+void AsteriskSCF::BridgeService::BridgeImpl::statePreCheck()
 {
     if(mState == ShuttingDown)
     {
         mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
-        throw Hydra::System::Component::V1::ShuttingDown();
+        throw AsteriskSCF::System::Component::V1::ShuttingDown();
     }
     if(mState == Destroyed)
     {
@@ -661,28 +441,9 @@ void Hydra::BridgeService::BridgeImpl::statePreCheck()
     }
 }
 
-std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator Hydra::BridgeService::BridgeImpl::find(const Hydra::Core::Endpoint::V1::EndpointIdPtr& e)
+std::vector<AsteriskSCF::BridgeService::BridgeImpl::BridgeSession> AsteriskSCF::BridgeService::BridgeImpl::currentSessions()
 {
-    mLogger.getDebugStream() << __FUNCTION__ << ": searching endpoints for " << e->endpointManagerId << ":" << e->destinationId << "." << std::endl;
-    for(std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = mEndpoints.begin(); i != mEndpoints.end(); ++i)
-    {
-        if(e->endpointManagerId == i->endpoint->id->endpointManagerId && e->destinationId == i->endpoint->id->destinationId)
-        {
-            return i;
-        }
-    }
-    mLogger.getDebugStream() << __FUNCTION__ << ": endpoint " << e->endpointManagerId << ":" << e->destinationId << " not found." << std::endl;
-    return mEndpoints.end();
-}
-
-std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint> Hydra::BridgeService::BridgeImpl::removeEndpointImpl(const Hydra::Core::Endpoint::V1::EndpointIdPtr& e)
-{
-    std::vector<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>::iterator i = find(e);
-    if(i != mEndpoints.end())
-    {
-        std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint> result(new BridgeEndpoint(*i));
-        mEndpoints.erase(i);
-        return result;
-    }
-    return std::auto_ptr<Hydra::BridgeService::BridgeImpl::BridgeEndpoint>(0);
+    boost::shared_lock<boost::shared_mutex> lock(mLock);
+    return mSessions;
 }
+    
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 3d544b0..242babc 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -1,114 +1,99 @@
+/*
+* Asterisk Scalable Communications Framework
+*
+* Copyright (C) 2010 -- Digium, Inc.
+*
+* All rights reserved.
+*/
 #pragma once
-#ifndef __HYDRA_BRIDGE_SERVICE_IMPL_H
-#define __HYDRA_BRIDGE_SERVICE_IMPL_H
 
-#include <Core/Bridging/BridgeServiceIf.h>
-#include <Core/Bridging/BridgeServiceEventsIf.h>
-#include <Session/SessionIf.h>
+#include <SessionCommunications/Bridging/BridgingIf.h>
+#include <SessionCommunications/SessionCommunicationsIf.h>
 #include <boost/thread/shared_mutex.hpp>
 #include <vector>
 #include "Logger.h"
 #include "MediaSplicer.h"
+#include "BridgeListenerMgr.h"
 
-namespace Hydra
+namespace AsteriskSCF
 {
 namespace BridgeService
 {
-    inline bool checkEndpointId(const Core::Endpoint::V1::EndpointId& e)
+    //
+    // BridgeImpl is an implmentation of AsteriskSCF::Bridging::V1::Bridge.
+    //
+    class BridgeImpl : public SessionCommunications::Bridging::V1::Bridge
     {
-       if(e.endpointManagerId.size() == 0 || e.destinationId.size() == 0)
+    public:
+        struct BridgeSession
         {
-            return false;
-        }
-        return true;
-    }
-
-    class EventTopicWrapper; 
-
-   //
-   // BridgeImpl is a session oriented bridge. 
-   //
-   class BridgeImpl : public Core::Bridging::V1::Bridge
-   {
-   public:
-      struct BridgeEndpoint
-      {
-          Session::V1::SessionEndpointPtr endpoint;
-          MediaConnectorPtr connector;
-
-          BridgeEndpoint(const Session::V1::SessionEndpointPtr& ep, const MediaConnectorPtr& con) :
-              endpoint(ep),
-              connector(con)
-          {
-          }
-      };
-
-      BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter, const Session::V1::SessionEndpointPtr& adminEp, const Session::V1::SessionEndpointSeq& initialEndpoints,
-        const Core::Bridging::V1::BridgeMonitorPrx& manager, const Core::Bridging::V1::BridgeEventsPrx& ev);
-
-      ~BridgeImpl();
-
-      //
-      // Hydra::Core::Bridging::Bridge Interface
-      //
-      void addEndpoint(const Core::Endpoint::V1::BaseEndpointPtr& ep, const Ice::Current& current);
-      void removeEndpoint(const Core::Endpoint::V1::EndpointIdPtr& ep, const Ice::Current& current);
-      Core::Endpoint::V1::EndpointSeq listEndpoints(const Ice::Current& current);
-      void shutdown(const Ice::Current& current);
-      void destroy(const Ice::Current& current);
-
-      //
-      // Internal methods
-      //
-      bool destroyed();
-
-      void endpointConnected(const Core::Endpoint::V1::EndpointIdPtr& id);
-      size_t endpointTerminated(const Core::Endpoint::V1::EndpointIdPtr& id, const Session::V1::ResponseCodePtr& response, const Ice::Current&);
-      void endpointRinging(const Core::Endpoint::V1::EndpointIdPtr& id);
-      void endpointBusy(const Core::Endpoint::V1::EndpointIdPtr& id);
-
-      Session::V1::SignalCallbackPtr signalCB();
-
-   private:
-
-      boost::shared_mutex mLock;
-      enum ServiceStates
-      { 
-         Running, 
-         ShuttingDown, 
-         Destroyed 
-      };
-      ServiceStates mState;
-
-      std::auto_ptr<BridgeEndpoint> mAdminEndpoint;
-      std::vector<BridgeEndpoint> mEndpoints;
-      Core::Bridging::V1::BridgeMonitorPrx mMonitor;
-      std::auto_ptr<EventTopicWrapper> mEvents;
-
-      std::map<std::string, Session::V1::SessionEndpointPtr> mQueuedEndpoints;
-
-      MediaSplicer mSplicer;
-
-      //
-      // Policy values.
-      //
-      bool mAdminEndpointInList;
-      Logger mLogger;
-
-      Core::Endpoint::V1::EndpointIdPtr mBridgeId;
-
-      Ice::ObjectAdapterPtr mObjAdapter;
-      Session::V1::SignalCallbackPrx mSignalling;
-      Session::V1::SignalCallbackPtr mSignallingServant;
-
-      void statePreCheck();
-      std::vector<BridgeEndpoint>::iterator find(const Core::Endpoint::V1::EndpointIdPtr& e);
-
-      std::auto_ptr<BridgeEndpoint> removeEndpointImpl(const Core::Endpoint::V1::EndpointIdPtr& e);
-   };
-
-   typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
+            SessionCommunications::V1::SessionPrx session;
+            MediaConnectorPtr connector;
+
+            BridgeSession(const SessionCommunications::V1::SessionPrx& s, const MediaConnectorPtr& con) :
+                session(s),
+                connector(con)
+            {
+            }
+        };
+
+        BridgeImpl(const Ice::ObjectAdapterPtr& objAdapter,
+                const SessionCommunications::V1::SessionSeq& initialSessions,
+                const SessionCommunications::Bridging::V1::BridgeListenerPrx& ev,
+                const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr);
+
+        ~BridgeImpl();
+
+        //
+        // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
+        //
+        void addSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current);
+        void removeSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current);
+
+        AsteriskSCF::SessionCommunications::V1::SessionSeq listSessions(const Ice::Current&);
+        void shutdown(const Ice::Current& current);
+        void destroy(const Ice::Current& current);
+
+        void addListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener, const Ice::Current& current);
+        void removeListener(const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener, const Ice::Current& current);
+
+        //
+        // Internal methods
+        //
+        bool destroyed();
+
+        void sessionConnected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+        size_t sessionStopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
+                const SessionCommunications::V1::ResponseCodePtr& response);
+        
+        std::vector<BridgeSession> currentSessions();
+
+    private:
+
+        boost::shared_mutex mLock;
+        enum ServiceStates
+        { 
+            Running, 
+            ShuttingDown, 
+            Destroyed 
+        };
+        ServiceStates mState;
+
+        std::vector<BridgeSession> mSessions;
+
+        MediaSplicer mSplicer;
+        Logger mLogger;
+
+        const std::string mName;
+        Ice::ObjectAdapterPtr mObjAdapter;
+
+        BridgeListenerMgrPtr mListeners;
+        SessionCommunications::V1::SessionListenerPtr mSessionListener;
+        SessionCommunications::V1::SessionListenerPrx mSessionListenerPrx;
+
+        void statePreCheck();
+    };
+
+    typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 } // End of namespace Bridging.
-} // End of namespace Hydra.
-
-#endif 
+} // End of namespace AsteriskSCF.
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
new file mode 100644
index 0000000..83e4bc6
--- /dev/null
+++ b/src/BridgeManagerImpl.cpp
@@ -0,0 +1,148 @@
+#include "BridgeManagerImpl.h"
+#include <Ice/Ice.h>
+#include <IceUtil/UUID.h>
+#include <boost/thread/locks.hpp>
+#include "BridgeListenerMgr.h"
+
+//
+// Compiled in constants.
+// TODO: Replace with configuration!
+//
+
+static const std::string TopicPrefix("AsteriskSCF.BridgeManager.");
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    //
+    // Functor used with for_each on shutdown.
+    //
+    class ShutdownImpl : public std::unary_function<BridgeManagerImpl::BridgeInfo, void>
+    {
+    public:
+        ShutdownImpl(const Ice::Current& c) :
+            mCurrent(c)
+        {
+        }
+
+        void operator()(const BridgeManagerImpl::BridgeInfo& b) 
+        { 
+            b.servant->shutdown(mCurrent); 
+        }
+
+    private:
+        const Ice::Current mCurrent;
+    };
+
+} // End of namespace BridgeService
+} // End of namespace AsteriskSCF
+
+AsteriskSCF::BridgeService::BridgeManagerImpl::BridgeManagerImpl(
+    const Ice::ObjectAdapterPtr& adapter,
+    const std::string& name,
+    const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener) :
+    mName(name),
+    mShuttingDown(false),
+    mSuspended(false),
+    mAdapter(adapter)
+{
+    mLogger.getInfoStream() << "Created AsteriskSCF Session-Oriented Bridge Manager." << std::endl;
+    mListeners = new ListenerManager(adapter->getCommunicator(), TopicPrefix + mName);
+    if(listener)
+    {
+        mListeners->addListener(listener);
+    }
+}
+
+AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx AsteriskSCF::BridgeService::BridgeManagerImpl::createBridge(
+  const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions,
+  const AsteriskSCF::SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+  const Ice::Current& current)
+{
+    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
+
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    if(mShuttingDown)
+    {
+        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
+        throw AsteriskSCF::System::Component::V1::ShuttingDown();
+    }
+    if(mSuspended)
+    {
+        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
+        throw AsteriskSCF::System::Component::V1::Suspended();
+    }
+    reap();
+
+    std::string stringId = std::string("bridge.") + IceUtil::generateUUID();
+    Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
+    AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx prx(
+        AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
+    AsteriskSCF::BridgeService::BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
+    
+    AsteriskSCF::BridgeService::BridgeImplPtr bridge = new AsteriskSCF::BridgeService::BridgeImpl(mAdapter, sessions, listener, mgr);
+    Ice::ObjectPrx obj = mAdapter->add(bridge, id);
+    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": creating new bridge " << obj->ice_toString() << "." << std::endl;
+    BridgeInfo info;
+    info.servant = bridge;
+    info.proxy = AsteriskSCF::SessionCommunications::Bridging::V1::BridgePrx::uncheckedCast(obj);
+    mBridges.push_back(info);
+    return info.proxy;
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::addListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current)
+{
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::removeListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current)
+{
+}
+
+AsteriskSCF::SessionCommunications::Bridging::V1::BridgeSeq
+AsteriskSCF::BridgeService::BridgeManagerImpl::listBridges(const Ice::Current&)
+{
+    boost::shared_lock<boost::shared_mutex> lock(mLock);
+    AsteriskSCF::SessionCommunications::Bridging::V1::BridgeSeq result;
+    for(std::vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end();++i)
+    {
+        result.push_back(i->proxy);
+    }
+    return result;
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::shutdown(const Ice::Current& current)
+{
+    mLogger.getTraceStream() << __FUNCTION__ << ":" << current.adapter->getCommunicator()->identityToString(current.id) << std::endl;
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    if(mShuttingDown)
+    {
+        mLogger.getDebugStream() << __FUNCTION__ << ": called when shutting down." << std::endl;
+        return;
+    }
+    if(mSuspended)
+    {
+        mLogger.getDebugStream() << __FUNCTION__ << ": called when suspended." << std::endl;
+        throw AsteriskSCF::System::Component::V1::Suspended();
+    }
+    mLogger.getInfoStream() << current.adapter->getCommunicator()->identityToString(current.id) << ": shutting down." << std::endl;
+    mShuttingDown = true;
+    reap();
+    std::for_each(mBridges.begin(), mBridges.end(), AsteriskSCF::BridgeService::ShutdownImpl(current));
+
+    mAdapter->getCommunicator()->shutdown();
+}
+
+void AsteriskSCF::BridgeService::BridgeManagerImpl::reap()
+{
+    mLogger.getDebugStream() << __FUNCTION__ << ": reaping bridge set of " << mBridges.size() << " bridges." << std::endl;
+    for(std::vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
+    {
+        if(i->servant->destroyed())
+        {
+            std::vector<BridgeInfo>::iterator t = i;
+            mBridges.erase(t);
+        }
+    }
+    mLogger.getDebugStream() << __FUNCTION__ << ": reaping completed, bridge set size is now " << mBridges.size() << "." << std::endl;
+}
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
new file mode 100644
index 0000000..530ecec
--- /dev/null
+++ b/src/BridgeManagerImpl.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <SessionCommunications/SessionCommunicationsIf.h>
+#include <SessionCommunications/Bridging/BridgingIf.h>
+#include <boost/thread/shared_mutex.hpp>
+#include <vector>
+
+#include "BridgeImpl.h"
+#include "Logger.h"
+#include "ListenerManager.h"
+
+namespace AsteriskSCF
+{
+namespace BridgeService
+{
+    class BridgeManagerImpl : public SessionCommunications::Bridging::V1::BridgeManager
+    {
+    public:
+        
+        BridgeManagerImpl(const Ice::ObjectAdapterPtr& adapter, const std::string& name,
+                const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener);
+        
+        //
+        // AsteriskSCF::SessionCommunications::Bridging::V1::BridgeManager Interface
+        //
+        SessionCommunications::Bridging::V1::BridgePrx createBridge(
+            const SessionCommunications::V1::SessionSeq& endpoints,
+            const SessionCommunications::Bridging::V1::BridgeListenerPrx& listener,
+            const Ice::Current& current);
+        
+        void addListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
+        void removeListener(const SessionCommunications::Bridging::V1::BridgeManagerListenerPrx& listener, const Ice::Current& current);
+        SessionCommunications::Bridging::V1::BridgeSeq listBridges(const Ice::Current& current);
+        void shutdown(const Ice::Current& current);
+
+        struct BridgeInfo
+        {
+            BridgeImplPtr servant;
+            SessionCommunications::Bridging::V1::BridgePrx proxy;
+        };
+        
+    private:
+        
+        boost::shared_mutex mLock;
+        std::string mName;
+        std::vector<BridgeInfo> mBridges;
+        bool mShuttingDown;
+        bool mSuspended;
+        Ice::ObjectAdapterPtr mAdapter;
+        Logger mLogger;
+        typedef ListenerManagerT<SessionCommunications::Bridging::V1::BridgeManagerListenerPrx> ListenerManager;
+        typedef IceUtil::Handle<ListenerManager> ListenerManagerPtr;
+        ListenerManagerPtr mListeners;
+        void reap();
+    };
+
+    typedef IceUtil::Handle<BridgeManagerImpl> BridgeManagerImplPtr;
+};
+};
diff --git a/src/BridgeServiceImpl.h b/src/BridgeServiceImpl.h
index cca9add..12aa7e6 100644
--- a/src/BridgeServiceImpl.h
+++ b/src/BridgeServiceImpl.h
@@ -2,9 +2,9 @@
 #ifndef __HYDRA_BRIDGE_SERVICE_IMPL_H
 #define __HYDRA_BRIDGE_SERVICE_IMPL_H
 
-#include <Hydra/Telephony/Bridging/BridgeService.h>
+#include <AsteriskSCF/Telephony/Bridging/BridgeService.h>
 
-namespace Hydra
+namespace AsteriskSCF
... 739 lines suppressed ...


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list