[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "single-build-dir" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Apr 26 09:05:29 CDT 2011


branch "single-build-dir" has been created
        at  ba42f30351e094f5fe76b24a496d7e1baa3a11a6 (commit)

- Log -----------------------------------------------------------------
commit ba42f30351e094f5fe76b24a496d7e1baa3a11a6
Author: Kevin P. Fleming <kpfleming at digium.com>
Date:   Mon Apr 25 17:49:26 2011 -0500

    Changes to work with new single-build-directory CMake script.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9979a32..e8432c0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -16,9 +16,6 @@ if(NOT integrated_build STREQUAL "true")
    # Include common AsteriskSCF build infrastructure
    include(cmake/AsteriskSCF.cmake)
 
-   # This project is C++ based and requires a minimum of 3.4
-   asterisk_scf_project(BridgingService 3.4 CXX)
-
    # Take care of slice definitions
    add_subdirectory(slice)
 
@@ -27,6 +24,8 @@ if(NOT integrated_build STREQUAL "true")
    add_subdirectory(logger)
 endif()
 
+asterisk_scf_project(BridgingService 3.4)
+
 add_subdirectory(src)
 add_subdirectory(config)
 add_subdirectory(test)
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 37d7b8c..8aa0cf7 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -108,13 +108,13 @@ ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
 #
 # The IceBox entries for loading the services.
 #
-IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
-IceBox.Service.Replicator=../src at BridgeReplicator:create
-IceBox.Service.TestBridge=../src at bridgeservice:create
-IceBox.Service.TestBridge2=../src/@bridgeservice:create
-IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
-IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridge_component_test:create
+IceBox.Service.Logger=logging-service:createLoggingService
+IceBox.Service.Replicator=BridgeReplicator:create
+IceBox.Service.TestBridge=bridgeservice:create
+IceBox.Service.TestBridge2=bridgeservice:create
+IceBox.Service.TestServiceLocator=service_locator:create
+IceBox.Service.TestChannel=test_channel:create
+IceBox.Service.TestDriver=bridge_component_test:create
 
 #
 # The bridging service uses the test channel`s Endpoint
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 2dfcc0e..b74c1e9 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -34,7 +34,7 @@ target_link_libraries(bridge_component_test test-channel-api)
 
 # component tests only work for integrated builds
 if(integrated_build STREQUAL "true")
-  icebox_add_test(bridge_component_test ../config/test_bridging.conf)
+  asterisk_scf_test_icebox(bridge_component_test config/test_bridging.conf)
 endif()
 
 asterisk_scf_component_init(bridge_unit_tests CXX)

commit 665b6a29b567c13c6bbce45fab4ca422bf2e17aa
Author: Joshua Colp <jcolp at digium.com>
Date:   Thu Apr 21 13:35:25 2011 -0300

    Add support for indicate.

diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 2cf579f..b55ef67 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -42,105 +42,85 @@ public:
     {
     }
 
-    void connected(const SessionPrx& source, const Ice::Current&)
+    void indicated(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+        const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication, const Ice::Current& current)
     {
-        string proxyString = source->ice_toString();
-        mLogger(Debug) << FUNLOG << ": session connected " << proxyString;
-
-        try
-        {
-            SessionWrapperPtr session = mSessions->getSession(source);
-            if (!session)
-            {
-                mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
-                              << proxyString << " that does not match known sessions. Possible out of order operations.";
-                return;
-            }
-
-            session->setup();
+	AsteriskSCF::SessionCommunications::V1::ConnectedIndicationPtr connected;
+	AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr ringing;
+	AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped;
+
+	if ((connected = AsteriskSCF::SessionCommunications::V1::ConnectedIndicationPtr::dynamicCast(indication)))
+	{
+	    string proxyString = source->ice_toString();
+	    mLogger(Debug) << FUNLOG << ": session connected " << proxyString;
+
+	    try
+	    {
+		SessionWrapperPtr session = mSessions->getSession(source);
+		if (!session)
+		{
+		    mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+				  << proxyString << " that does not match known sessions. Possible out of order operations.";
+		    return;
+		}
+
+		session->setup();
             
-            ConnectSessionOperation connector(source, mLogger);
-            mSessions->visitSessions(connector);
-        }
-        catch (const Ice::Exception& ex)
-        {
-            mLogger(Debug) << FUNLOG << ": " << ex.what();
-            throw;
-        }
-    }
-
-    void flashed(const SessionPrx& source, const Ice::Current&)
-    {
-        //
-        // TODO: Is there something to be done here?
-        // 
-    }
-
-    void held(const SessionPrx& source, const Ice::Current&)
-    {
-        //
-        // TODO: Should this affect the media operations.
-        // 
-    }
-
-    void progressing(const SessionPrx& source, 
-      const ResponseCodePtr& response, const Ice::Current&)
-    {
-        //
-        // TODO: Is there something to be done here?
-        // 
-    }
-
-    void ringing(const SessionPrx& source, const Ice::Current&)
-    {
-        //
-        // TODO: Who gets the ring notifications will likely depend on configuration, etc.
-        //
-        RingSessionOperation ringer(source, mLogger);
-        mSessions->visitSessions(ringer);
-    }
-
-    void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
-    {
-        string proxyString = source->ice_toString();
-        mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
-
-        //
-        // IMPLNOTE: The AMI approach.
-        // Stack some operations in a callback. In this case it will be:
-        // 1. getSession()
-        // 2. removeBridge()
-        // 3. disconnect() or destroy()
-        //
-        // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
-        // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
-        // ways to go about it. Need to think on this more.
-        //
-        // Q. What is the proper order of operations here, should removeBridge be called before or after we
-        // "disconnect" the session? 
-        //
-        SessionWrapperPtr session;
-        session = mSessions->getSession(source);
-        if (!session)
-        {
-            mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
-                          << proxyString << " that does not match known sessions. Possible out of order operations.";
-            return;
-        }
-
-        //
-        // We don't actually have the proxy for this object, but we can create one on the fly.
-        //
-        SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
-
-        //
-        // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
-        //
-        session->shutdown(listenerPrx, new ResponseCode);
-    }
-
-    void unheld(const SessionPrx& source, const Ice::Current&)
-    {
+		ConnectSessionOperation connector(source, mLogger);
+		mSessions->visitSessions(connector);
+	    }
+	    catch (const Ice::Exception& ex)
+	    {
+		mLogger(Debug) << FUNLOG << ": " << ex.what();
+		throw;
+	    }
+	}
+	else if ((ringing = AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr::dynamicCast(indication)))
+	{
+	    //
+	    // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+	    //
+	    RingSessionOperation ringer(source, mLogger);
+	    mSessions->visitSessions(ringer);
+	}
+	else if ((stopped = AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr::dynamicCast(indication)))
+	{
+	    string proxyString = source->ice_toString();
+	    mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
+
+	    //
+	    // IMPLNOTE: The AMI approach.
+	    // Stack some operations in a callback. In this case it will be:
+	    // 1. getSession()
+	    // 2. removeBridge()
+	    // 3. disconnect() or destroy()
+	    //
+	    // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
+	    // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
+	    // ways to go about it. Need to think on this more.
+	    //
+	    // Q. What is the proper order of operations here, should removeBridge be called before or after we
+	    // "disconnect" the session? 
+	    //
+	    SessionWrapperPtr session;
+	    session = mSessions->getSession(source);
+	    if (!session)
+	    {
+		mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+			      << proxyString << " that does not match known sessions. Possible out of order operations.";
+		return;
+	    }
+
+	    //
+	    // We don't actually have the proxy for this object, but we can create one on the fly.
+	    //
+	    SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
+	    
+	    //
+	    // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
+	    //
+	    session->shutdown(listenerPrx, new ResponseCode);
+	}
     }
 
 private:
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 0e982aa..56a5c5a 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -70,7 +70,7 @@ void RingSessionOperation::operator()(const SessionWrapperPtr& session)
             //
             // TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
             //
-            s->ring();
+	    s->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
         }
         catch (const Ice::ObjectNotExistException& ex)
         {
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index a13dd18..a4da471 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -343,10 +343,11 @@ void SessionWrapper::connect()
         // care of dealing with the AMI callbacks. More accurately, there is a class wide singleton for the callback
         // object and the SessionWrapper itself is passed as the cookie. No further "book-keeping" is required.
         //
-        mSession->session->begin_connect(
-            newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
-                    &ConnectRequestCallback::connectCB,
-                    &ConnectRequestCallback::failureCB));
+        mSession->session->begin_indicate(
+	    new AsteriskSCF::SessionCommunications::V1::ConnectIndication(),
+            newCallback_Session_indicate(new ConnectRequestCallback(this, mLogger),
+		&ConnectRequestCallback::connectCB,
+		&ConnectRequestCallback::failureCB));
     }
 }
 
@@ -359,7 +360,7 @@ void SessionWrapper::ring()
             return;
         }
     }
-    tryOneWay(mSession->session)->ring();
+    tryOneWay(mSession->session)->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
 }
 
 bool SessionWrapper::setConnected()

commit 99fbea34400b3eedcbee29014104cf60b332555a
Author: Brent Eagles <beagles at digium.com>
Date:   Tue Apr 19 10:56:02 2011 -0230

    Address shadow warnings in recent commits.

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 890d0b3..986aa73 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -106,7 +106,7 @@ public:
     
     void activate();
     string id();
-    SessionCollectionPtr sessions();
+    SessionCollectionPtr getSessions();
 
     void forceUpdate();
 
@@ -400,7 +400,7 @@ public:
 protected:
     bool executeImpl()
     {
-        if (mBridge->sessions()->size() < 2 && mPrx)
+        if (mBridge->getSessions()->size() < 2 && mPrx)
         {
             mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
                             &CheckShutdown::failed));
@@ -908,7 +908,7 @@ string BridgeImpl::id()
     return mState->bridgeId;
 }
 
-SessionCollectionPtr BridgeImpl::sessions()
+SessionCollectionPtr BridgeImpl::getSessions()
 {
     return mSessions;
 }
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 118686a..f764355 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -105,7 +105,7 @@ public:
 
     virtual std::string id() = 0;
 
-    virtual SessionCollectionPtr sessions() = 0;
+    virtual SessionCollectionPtr getSessions() = 0;
 
     /**
      *
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 4ce7a1b..be58b67 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -79,7 +79,7 @@ public:
     
     void activate();
 
-    string id()
+    string getID()
     {
         return mName;
     }
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index f9f2c71..dd7fad3 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -50,7 +50,7 @@ public:
 
     virtual void activate() = 0;
 
-    virtual std::string id() = 0;
+    virtual std::string getID() = 0;
 
     virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
 
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 0f47d91..5186463 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -62,7 +62,7 @@ public:
                     {
                         if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
                         {
-                            SessionCollectionPtr sessions = (*b)->sessions();
+                            SessionCollectionPtr sessions = (*b)->getSessions();
                             sessions->removeSession(bridgedSessionItem);
                             //
                             // Keep the session list clean.
@@ -147,7 +147,7 @@ public:
             BridgeManagerStateItemPtr managerItem = BridgeManagerStateItemPtr::dynamicCast((*i));
             if (managerItem)
             {
-                if(managerItem->key == mManager->id())
+                if(managerItem->key == mManager->getID())
                 {
                     //
                     // There is only on bridge per listener instance by design/implementation, so this
@@ -198,7 +198,7 @@ public:
                 {
                     if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
                     {
-                        SessionCollectionPtr sessions = (*b)->sessions();
+                        SessionCollectionPtr sessions = (*b)->getSessions();
                         sessions->replicaUpdate(bridgedSessionItem);
                         //
                         // Keep the session list clean.
@@ -226,7 +226,7 @@ public:
                 {
                     if ((*b) && (*b)->id() == sessionPairing->bridgeKey)
                     {
-                        SessionWrapperPtr session = (*b)->sessions()->getSession(sessionPairing->sessionKey);
+                        SessionWrapperPtr session = (*b)->getSessions()->getSession(sessionPairing->sessionKey);
                         if (session)
                         {
                             session->updateMedia(sessionPairing);
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index b0dd7f1..233df9e 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -190,48 +190,48 @@ public:
             const IncomingPairings& incoming)
     {
         mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
-        SessionPairingPtr update;
+        SessionPairingPtr currentState;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
             mPeer = peer;
             mOutgoing = outgoing;
             mIncoming = incoming;
-            update = createUpdate();
+            currentState = createUpdate();
         }
-        pushUpdate(update);
+        pushUpdate(currentState);
     }
 
     SessionPairingPtr createUpdate()
     {
         if (mReplicator)
         {
-            SessionPairingPtr update(new SessionPairing);
-            update->mediaSession = mMedia;
-            update->bridgeKey = mBridgeId;
-            update->sessionKey = mSessionId;
-            update->key = mKey;
-            update->serial = ++mRepCounter;
+            SessionPairingPtr currentState(new SessionPairing);
+            currentState->mediaSession = mMedia;
+            currentState->bridgeKey = mBridgeId;
+            currentState->sessionKey = mSessionId;
+            currentState->key = mKey;
+            currentState->serial = ++mRepCounter;
             for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
             {
-                update->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
+                currentState->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
             }
             for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
             {
-                update->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
+                currentState->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
             }
-            return update;
+            return currentState;
         } 
         return 0;       
     }
 
-    void pushUpdate(const SessionPairingPtr& update)
+    void pushUpdate(const SessionPairingPtr& newState)
     {
-        if (mReplicator && update)
+        if (mReplicator && newState)
         {
             try
             {
                 ReplicatedStateItemSeq seq;
-                seq.push_back(update);
+                seq.push_back(newState);
                 mReplicator->setState(seq);
             }
             catch (const std::exception& ex)
@@ -253,7 +253,7 @@ public:
     {
         vector<OutgoingPairing> outgoing;
         vector<IncomingPairing> incoming;
-        SessionPairingPtr update;
+        SessionPairingPtr newState;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
             outgoing = mOutgoing;
@@ -261,9 +261,9 @@ public:
             incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
-            update = createUpdate();
+            newState = createUpdate();
         }
-        pushUpdate(update);
+        pushUpdate(newState);
         if (outgoing.size() == 0 && incoming.size() == 0)
         {
             return;
@@ -277,7 +277,8 @@ public:
         //
         for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
         {
-            mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and " << i->first->ice_toString();
+            mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and "
+                           << i->first->ice_toString();
 
             try
             {
@@ -318,17 +319,17 @@ public:
         }
     }
 
-    void update(const SessionPairingPtr& update)
+    void update(const SessionPairingPtr& newState)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mRepCounter = update->serial;
+        mRepCounter = newState->serial;
         mOutgoing.clear();
         mIncoming.clear();
-        for (MediaPairingSeq::iterator i = update->outgoingMediaPairings.begin(); i != update->outgoingMediaPairings.end(); ++i)
+        for (MediaPairingSeq::iterator i = newState->outgoingMediaPairings.begin(); i != newState->outgoingMediaPairings.end(); ++i)
         {
             mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
         }
-        for (MediaPairingSeq::iterator i = update->incomingMediaPairings.begin(); i != update->incomingMediaPairings.end(); ++i)
+        for (MediaPairingSeq::iterator i = newState->incomingMediaPairings.begin(); i != newState->incomingMediaPairings.end(); ++i)
         {
             mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
         }
@@ -339,7 +340,7 @@ public:
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
         vector<OutgoingPairing> outgoing;
         vector<IncomingPairing> incoming;
-        SessionPairingPtr update;
+        SessionPairingPtr newState;
         {
             boost::unique_lock<boost::shared_mutex> lock(mLock);
             outgoing = mOutgoing;
@@ -347,9 +348,9 @@ public:
             incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
-            update = createUpdate();
+            newState = createUpdate();
         }
-        pushUpdate(update);
+        pushUpdate(newState);
         if (outgoing.size() == 0 && incoming.size() == 0)
         {
             return false;
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 336ab16..a13dd18 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -64,29 +64,29 @@ public:
         {
             ex.ice_throw();
         }
-        catch (const Ice::RequestFailedException& ex)
+        catch (const Ice::RequestFailedException& x)
         {
             //
             // These exceptions indicate something is wrong with the request on this object. We should not expect that
             // it could ever exceed. ObjectNotExistException is included in this catch.
             // TODO- it might be reasonable to disconnect this session for this!
             //
-            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
             mSession->destroy();
         }
-        catch (const Ice::SocketException& ex)
+        catch (const Ice::SocketException& x)
         {
             //
             // A possible retry scenario.
             //
-            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
         }
-        catch (const std::exception& ex)
+        catch (const std::exception& x)
         {
             //
             // Not much to be done here except log the error and move on.
             //
-            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+            mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
         }
    }
     
@@ -365,17 +365,17 @@ void SessionWrapper::ring()
 bool SessionWrapper::setConnected()
 {
     mLogger(Debug) << FUNLOG << " for " << mId;
-    BridgedSessionPtr update;
+    BridgedSessionPtr stateUpdate;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState == Added)
         {
             mSession->currentState = Connected;
-            update = createUpdate();
+            stateUpdate = createUpdate();
         }
     }
-    pushUpdate(update);
-    return (update != 0);
+    pushUpdate(stateUpdate);
+    return (stateUpdate != 0);
 }
 
 BridgedSessionPtr SessionWrapper::getBridgedSession() const
@@ -393,17 +393,17 @@ SessionPrx SessionWrapper::getSession() const
 void SessionWrapper::setupMedia()
 {
     mLogger(Debug) << FUNLOG << " for " << mId;
-    BridgedSessionPtr update;
+    BridgedSessionPtr stateUpdate;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState == Added)
         {
             mSplicer->connect(this);
             mSession->currentState = Connected;
-            update = createUpdate();
+            stateUpdate = createUpdate();
         }
     }
-    pushUpdate(update);
+    pushUpdate(stateUpdate);
 }
 
 void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
@@ -432,7 +432,7 @@ void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
 void SessionWrapper::disconnect()
 {
     mLogger(Debug) << FUNLOG << ": disconnecting " << mId;
-    BridgedSessionPtr update;
+    BridgedSessionPtr stateUpdate;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState != Disconnected &&
@@ -440,31 +440,31 @@ void SessionWrapper::disconnect()
         {
             unplugMedia();
             mSession->currentState = Disconnected;
-            update = createUpdate();
+            stateUpdate = createUpdate();
         }
     }
-    pushUpdate(update);
+    pushUpdate(stateUpdate);
 }
 
-void SessionWrapper::update(const BridgedSessionPtr& update)
+void SessionWrapper::update(const BridgedSessionPtr& newState)
 {
     mLogger(Debug) << FUNLOG << ": for " << mId;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if (mSession->serial < update->serial || update->serial < SerialCounterStart)
+    if (mSession->serial < newState->serial || newState->serial < SerialCounterStart)
     {
-        mSession = update;
+        mSession = newState;
     }
     else
     {
-        mLogger(Info) << ": detected out of order replication update for " << update->key <<
-            "(" << mSession->serial << " to " << update->serial << ")";
+        mLogger(Info) << ": detected out of order replication update for " << newState->key <<
+            "(" << mSession->serial << " to " << newState->serial << ")";
     }
 }
 
 void SessionWrapper::destroy()
 {
     mLogger(Debug) << FUNLOG << ": for " << mId;
-    BridgedSessionPtr update;
+    BridgedSessionPtr newState;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         if (mSession->currentState != Done)
@@ -475,10 +475,10 @@ void SessionWrapper::destroy()
             //
             unplugMedia();
             mSession->currentState = Done;
-            update = createUpdate();
+            newState = createUpdate();
         }
     }
-    pushUpdate(update);
+    pushUpdate(newState);
 }
 
 bool SessionWrapper::isDestroyed()
@@ -515,16 +515,16 @@ void SessionWrapper::shutdown(const SessionListenerPrx& listener, const Response
 void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
 {
     mLogger(Debug) << FUNLOG << ": updating state " << mId;
-    BridgedSessionPtr update;
+    BridgedSessionPtr copyOfNewState;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         //
         // TODO:
         //
         mSession->currentState = newState;
-        update = createUpdate();
+        copyOfNewState = createUpdate();
     }
-    pushUpdate(update);
+    pushUpdate(copyOfNewState);
 }
 
 void SessionWrapper::unplugMedia()
@@ -542,12 +542,12 @@ void SessionWrapper::unplugMedia()
     }
  }
 
-void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& newState)
 {
-    if (update && mReplicator)
+    if (newState && mReplicator)
     {
         ReplicatedStateItemSeq seq;
-        seq.push_back(update);
+        seq.push_back(newState);
         mReplicator->setState(seq);
     }
 }
diff --git a/src/Tasks.h b/src/Tasks.h
index bfaa6d5..57898b3 100644
--- a/src/Tasks.h
+++ b/src/Tasks.h
@@ -163,8 +163,8 @@ protected:
     {
     }
 
-    QueuedTask(const std::string& name) :
-        mName(name)
+    QueuedTask(const std::string& nameStr) :
+        mName(nameStr)
     {
     }
 
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index b43506a..41ee52c 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -73,10 +73,10 @@ public:
         // usage is a little more expensive in that you are always setting whether it's set or not, but it 
         // avoids having the "if" statement.
         //
-        Ice::PropertiesPtr properties = mCommunicator->getProperties();
-        propGetSet(properties, "TestUtilAdapter.ThreadPool.Size", "4");
-        propGetSet(properties, "TestUtilAdapter.ThreadPool.Size", "4");
-        propGetSet(properties, "TestUtilAdapter.Endpoints", "default");
+        Ice::PropertiesPtr props = mCommunicator->getProperties();
+        propGetSet(props, "TestUtilAdapter.ThreadPool.Size", "4");
+        propGetSet(props, "TestUtilAdapter.ThreadPool.Size", "4");
+        propGetSet(props, "TestUtilAdapter.Endpoints", "default");
         lookupProxies();
     }
 
@@ -247,7 +247,7 @@ public:
         }
     }
 
-    AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx locator()
+    AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx getLocator()
     {
         return mLocatorPrx;
     }

commit bae047c389685f743ff52036ee82429913bfe775
Author: Brent Eagles <beagles at digium.com>
Date:   Tue Apr 19 09:32:21 2011 -0230

    Merging async-bridging and bridge-replication branches to master.
    Change summary:
     - Added active->standby state replication of bridge related state.
     - Added a replication service component for the above.
     - Added some component test suite modifications for testing replication.
     - The bridge service now uses AMI and AMD for key, potentially long running
       methods.
     - The bridge service implementation employs a preliminary version of
       "workqueues". This will need to be converted to use the common Asterisk
        SCF workqueue facility when it is completed.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 498a40a..9979a32 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -17,7 +17,7 @@ if(NOT integrated_build STREQUAL "true")
    include(cmake/AsteriskSCF.cmake)
 
    # This project is C++ based and requires a minimum of 3.4
-   asterisk_scf_project("bridging service" 3.4 CXX)
+   asterisk_scf_project(BridgingService 3.4 CXX)
 
    # Take care of slice definitions
    add_subdirectory(slice)
@@ -25,9 +25,6 @@ if(NOT integrated_build STREQUAL "true")
    # logger is integrated into our build
    set(integrated_build true)
    add_subdirectory(logger)
-   set(integrated_build false)
-else()
-   set(bridging_bindir ${CMAKE_CURRENT_BINARY_DIR} PARENT_SCOPE)
 endif()
 
 add_subdirectory(src)
diff --git a/README.txt b/README.txt
index 15d1447..2d1f1b6 100644
--- a/README.txt
+++ b/README.txt
@@ -1,7 +1,7 @@
 ===============================================================================
 ===                   Asterisk SCF - Bridging                               ===
 ===                                                                         ===
-===                   Copyright (C) 2010, Digium, Inc.                      ===
+===                   Copyright (C) 2010-2011, Digium, Inc.                      ===
 ===============================================================================
 
 -------------------------------------------------------------------------------
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index c9e92e0..37d7b8c 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -1,37 +1,163 @@
+################################################################################
+# Configuration file for use with the bridging component's test driver.
+#
+#
+# General configuration
+#
+# We turn off all collocation optimization by default to avoid issues with any services
+# that might have trouble due to AMI/AMD usage.
+#
+Ice.Default.CollocationOptimized=0
+
+#
+# We use a single file for configuration.
+#
+IceBox.InheritProperties=1
+
+#
+# It's important to specify a reasonable size for the client thread pool for services
+# that use AMI/AMD
+# 
+Ice.ThreadPool.Client.Size=4
+
+
+################################################################################
+# The following configuration is required for the the Service Locator component
+# because it loads a collocated instance of IceStorm.
+#
+#
+# We register services with names to help identify them within an IceBox instance.
+# It also helps with identifying properties in a configuration file as each
+# property that is specific to this service will be prefixed with
+# AsteriskSCFIceStorm.
+#
 AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
-AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 55555
-AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
+
+
+#
+# Configure the IceStorm TopicManager's object adapter.
+#
+AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 10000
+AsteriskSCFIceStorm.TopicManager.ThreadPool.Size=4
+
+#
+# Configure the IceStorm publisher object adapter.
+#
+AsteriskSCFIceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001
+AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
+
+#
+# This IceStorm instance will not need to persist subscriber/publisher
+# information across process lifetimes.
+# 
 AsteriskSCFIceStorm.Transient=1
 
+#
+# Control TopicManager tracing.
+#
+# 0 = no tracing
+# 1 = trace topic creation, subscription, unsubscription
+# 2 = like 1, but with more detailed subscription information
+# 
+AsteriskSCFIceStorm.Trace.TopicManager=0
+
+#
+# Flush interval in case any any subscribers have subscribed
+# using a batched oneway proxy. (Default is currently 1000ms)
+#
+AsteriskSCFIceStorm.Flush.Timeout=2000
+
+#
+# Finally the proxy that can be used to access this IceStorm instance.
+#
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+
+#
+# Logger service configuration. Proxies to the logger service
+# are obtained through the service locator so specifying a port
+# number is not required.
+#
 LoggerAdapter.Endpoints=default
 AsteriskSCF.LoggingService.Endpoints=default
+AsteriskSCF.LoggingService.ThreadPool.Size=4
 AsteriskSCF.LoggingClient.Endpoints=default
-AsteriskSCF.TestChannelService.Endpoints=default -p 55560
-
-LocatorService.Proxy=LocatorService:default -p 55558
+AsteriskSCF.LoggingClient.ThreadPool.Size=4
 
-ServiceLocatorManagementAdapter.Endpoints=tcp -p 55557
-ServiceLocatorAdapter.Endpoints=tcp -p 55558
 
-ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 55557
+#
+# Service Locator configuration. Usually the service locator
+# and it`s collocated IceStorm instance are the only services
+# that need to have their ports specified. The service locator
+# instantiates two adapters:
+# * The ServiceLocatorAdapter which hosts the servants that implement
+#   the lookup queries for clients wishing to obtain proxies to services.
+# * The ServiceLocatorManagerAdapter which hosts the servants for the
+#   objects that implement the registration management for services
+#
+ServiceLocatorAdapter.Endpoints=tcp -p 4411
+ServiceLocatorAdapter.ThreadPool.Size=4
+ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
+ServiceLocatorManagementAdapter.ThreadPool.Size=4
 
-TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
+#
+# The proxies that clients use to access the Service Locator facilities.
+#
+LocatorService.Proxy=LocatorService:default -p 4411
+ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
 
-IceBox.InheritProperties=1
+#
+# The IceBox entries for loading the services.
+#
 IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
+IceBox.Service.Replicator=../src at BridgeReplicator:create
 IceBox.Service.TestBridge=../src at bridgeservice:create
+IceBox.Service.TestBridge2=../src/@bridgeservice:create
 IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
 IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridging_unit_test:create
+IceBox.Service.TestDriver=../test at bridge_component_test:create
+
+#
+# The bridging service uses the test channel`s Endpoint
+# to create test sessions and register them with the bridge.
+# NOTE: this will be changed to be accessible through
+# service discovery in a later branch.
+#
+TestChannel.InstanceName=BridgeTest
 
+#
+# Configuration for the test bridge instances. There are are two: one master
+# (TestBridge) and one standby (TestBridge2).
+# NOTE: These will be changed to be accessible through
+# service discovery in a later branch.
+#
 TestBridge.InstanceName=TestBridge
-TestBridge.BridgeService.Endpoints=default -p 55561
-TestBridge.Proxy=BridgeManager:default -p 55561
-TestChannel.Proxy=TestChannel:default -p 55560
-TestUtilAdapter.Endpoints=default -p 55562
-Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
+TestBridge.ManagerId=TestBridgeManager
+
+TestBridge2.InstanceName=TestBridge2
+TestBridge2.ManagerId=TestBridgeManager2
+TestBridge2.StateReplicatorListener=yes
+
+#
+# Configuration for the bridge state replicator.
+# NOTE: This will be changed to only use service discovery in the a later
+# branch (i.e. it will not be necessary to configure the object adapter
+# or specify the proxy in the future)
+# 
+Replicator.InstanceName=Replicator
 
-IceBox.LoadOrder=TestServiceLocator Logger TestBridge TestChannel TestDriver
+#
+# Some IceBox configuration.
+#
+#
+# Multiple services are loaded in the single IceBox instance. It is necessary to have
+# them start in a specific order so dependencies are available when they are needed.
+#
+IceBox.LoadOrder=TestServiceLocator Logger Replicator TestBridge TestBridge2 TestChannel TestDriver
 
+#
+# Configuring a manager endpoint for the IceBox service allows services to be managed and
+# monitored.
+#
 IceBox.ServiceManager.Endpoints=default -p 56000
+IceBox.ServiceManager.ThreadPool.Size=4
 IceBoxMgr.Proxy=IceBox/ServiceManager:default -p 56000
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index b799ca0..890d0b3 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1,7 +1,7 @@
 /*
  * Asterisk SCF -- An open-source communications framework.
  *
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
  *
  * See http://www.asterisk.org for more information about
  * the Asterisk SCF project. Please do not directly contact
@@ -14,443 +14,506 @@
  * at the top of the source tree.
  */
 #include "BridgeImpl.h"
+
 #include <AsteriskSCF/logger.h>
 #include <AsteriskSCF/System/Component/ComponentServiceIf.h>
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 #include <Ice/Ice.h>
+#include <boost/thread/locks.hpp>
 #include <memory>
 #include <algorithm>
-#include <boost/thread/locks.hpp>
+#include <set>
+#include "ServiceUtil.h"
+#include "DebugUtil.h"
+
+#include "SessionWrapper.h"
+#include "SessionOperations.h"
+#include "SessionListener.h"
 
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
 
 /**
  *
- * NOTE: Code must be reviewed/refactored for exception safety/consistency.
- * Operations involving establishing media connections may or may not be
- * catastrophic. An example of a non-catastrophic situation might be a media
- * allocation operation that might immediately fail for transient reasons but
- * can be initialized in the background in a relatively timely fashion (of
- * course this would depend on the context).
+ * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
+ * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
+ * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
+ * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
+ * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
+ * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
+ * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
+ * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
+ * refactoring for replication could have occurred in that development branch. However, replication is a separate task
+ * and it is desirable to have a *relevant* review of it independently of asynchronous support.
  *
  */
 
 namespace
 {
-Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
 
-class RetryPolicy
+/**
+ * 
+ * BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
+ * interface.
+ *
+ **/
+class BridgeImpl : virtual public BridgeServant
 {
 public:
-    RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
-        mMaxRetries(maxRetries),
-        mRetryInterval(intervalInMilliseconds),
-        mCounter(0)
-    {
-    }
+    BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& objAdapter,
+            const vector<BridgeListenerPrx>& listeners,
+            const BridgeListenerMgrPtr& listenerMgr,
+            const ReplicatorSmartPrx& replicator,
+            const BridgeStateItemPtr& state,
+            const Logger& logger);
 
-    bool canRetry()
-    {
-        return mCounter < mMaxRetries;
-    }
+    ~BridgeImpl();
 
-    bool retry()
-    {
-        lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
-        IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
-        ++mCounter;
-        return canRetry();
-    }
+    //
+    // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
+    //
+    void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
+    void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
+    void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+            const Ice::Current&);
+
+    SessionSeq listSessions(const Ice::Current&);
+    void shutdown(const Ice::Current& current);
+    void destroy(const Ice::Current& current);
+
+    void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+    void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+
+    void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac, const SessionPrx& sessionToReplace,
+            const SessionSeq& newSessions, const Ice::Current& current);
+
+    //
+    // BridgeServant methods
+    //
+    bool destroyed();
+    void destroyImpl();
+    void shutdownImpl(const Ice::Current& current);
+    void activate(const BridgePrx& proxy);
+
+    void updateState(const BridgeStateItemPtr& state);
+    void addListener(const BridgeListenerStateItemPtr& update);
+    void removeListener(const BridgeListenerStateItemPtr& update);
+    
+    void activate();
+    string id();
+    SessionCollectionPtr sessions();
+
+    void forceUpdate();
+
+    void getAddSessionsTasks(QueuedTasks& tasks, const SessionSeq& sessions);
 
 private:
-    size_t mMaxRetries;
-    size_t mRetryInterval;
-    size_t mCounter;
+
+    boost::shared_mutex mLock;
+
+    //
+    // True if not a standby object.
+    //
+    bool mActivated;
+
+    //
+    // The replicated state.
+    //
+    BridgeStateItemPtr mState;
+
+    //
+    // Helper class for dealing with the sessions.
+    //
+    SessionCollectionPtr mSessions;
+
+    const string mName;
+    Ice::ObjectAdapterPtr mObjAdapter;
+
+    BridgeListenerMgrPtr mListeners;
+    ReplicatorSmartPrx mReplicator;
+    
+    //
+    // The bridge's callback implementation for the sessions that get added to it.
+    //
+    SessionListenerPtr mSessionListener;
+    SessionListenerPrx mSessionListenerPrx;
+
+    //
+    // A proxy to this bridge. Used when publishing events.
+    //
+    BridgePrx mPrx;
+    
+    //
+    // The logger object.
+    //
+    Logger mLogger;
+
+    void statePreCheck();
+    BridgeStateItemPtr createUpdate();
+    void pushUpdate(const BridgeStateItemPtr& update);
+    void pushUpdates(const ReplicatedStateItemSeq& updates);
+    BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
+
+    bool replicate()
+    {
+        return (mActivated && mReplicator != 0);
+    }
 };
+typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 
-void checkSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+//
+// simply checks for nulls at this point.
+//
+static void checkSessions(const SessionSeq& sessions)
 {
     Ice::LongSeq invalidIndexes;
     Ice::Long index = 0;
-    for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); 
-      i != sessions.end(); ++i, ++index)
+    for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
     {
-        if(*i == 0)
+        if (*i == 0)
         {
             invalidIndexes.push_back(index);
         }
     }
-    if(invalidIndexes.size() > 0)
+    if (invalidIndexes.size() > 0)
     {
-        throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(invalidIndexes);
+        throw InvalidSessions(invalidIndexes);
     }
 }
-}
-
-//
-// Compiled in constants.
-// TODO: Replace with configuration!
-//
-
-static const std::string TopicPrefix("AsteriskSCF.Bridge.");
 
-//
-// TODO:
-// Operations that are performed on all bridge sessions might be better done as AMI requests.
-//
-namespace AsteriskSCF
-{
-namespace BridgeService
-{
-//
-// Functor to support using for_each on shutdown.
-//
-class ShutdownImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SessionsTracker : public IceUtil::Shared
 {
 public:
-    ShutdownImpl(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
-        const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
-        mListener(listener),
-        mResponse(response)
+    SessionsTracker() :
+        mResponses(0)
     {
     }
-
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    
+    void add(const SessionPrx& s)
     {
-        try
-        {
-            b->getSession()->removeBridge(mListener);
-            b->disconnect();
-            b->getSession()->stop(mResponse);
-        }
-        catch(const Ice::ObjectNotExistException& ex)
-        {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-            mNonExistent.push_back(b);
-        }
-        catch(const Ice::Exception& ex)
-        {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSessions.push_back(s);
+        ++mResponses;
+    }
+ 
+    SessionSeq getSessions()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mSessions;
     }
 
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+    void addException(const SessionPrx& session, const Ice::Exception& ex)
     {
-        return mNonExistent;
+        addExceptionMessage(session, ex.what());
     }
-private:
-    AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListener;
-    AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
 
-class ProgressingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
-    ProgressingImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
-        mResponse(response)
+    void addExceptionMessage(const SessionPrx& session, const string& msg)
     {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        SessionError error;
+        error.failedSession = session;
+        error.message = msg;
+        mExceptions.push_back(error);
+        ++mResponses;
     }
 
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    SessionErrorSeq getExceptions()
     {
-        try
-        {
-            b->getSession()->progress(mResponse);
-        }
-        catch(const Ice::ObjectNotExistException& ex)
-        {
-            mNonExistent.push_back(b);
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
-        catch(const Ice::Exception& ex)
-        {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mExceptions;
     }
 
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+    size_t responseCount()
     {
-        return mNonExistent;
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mResponses;
     }
 
 private:
-    AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+    boost::shared_mutex mLock;
+    SessionSeq mSessions;
+    SessionErrorSeq mExceptions;
+    size_t mResponses;
 };
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
 
-class RingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class RemoveSessionsNotify : public QueuedTask
 {
 public:
-    RingImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
-        mExclude(exclude)
+    RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+            const SessionsTrackerPtr& tracker) :
+        QueuedTask("RemoveSessionsNotify"),
+        mBridgeListeners(bridgeListeners),
+        mTracker(tracker)
     {
     }
-
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    
+protected:
+    bool executeImpl()
     {
-        if(b->getSession() != mExclude)
+        SessionSeq sessions = mTracker->getSessions();
+        if (!sessions.empty())
         {
-            try
-            {
-                b->ring();
-            }
-            catch(const Ice::ObjectNotExistException& ex)
-            {
-                mNonExistent.push_back(b);
-                lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-            }
-            catch(const Ice::Exception& ex)
-            {
-                lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-            }
+            mBridgeListeners->sessionsRemoved(sessions);
         }
+        return true;
     }
-
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
-    {
-        return mNonExistent;
-    }
-
+           
 private:
-    AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+    BridgeListenerMgrPtr mBridgeListeners;
+    SessionsTrackerPtr mTracker;
 };
 
-class FlashImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SetBridgeTask : public QueuedTask
 {
 public:
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+            const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+        QueuedTask("SetBridgeTask"),
+        mSessionManager(sessionCollection),
+        mBridge(bridge),
+        mSessionListener(listener),
+        mSessions(sessions),
+        mTracker(tracker)
     {
-        try
-        {
-            b->getSession()->flash();
-        }
-        catch(const Ice::ObjectNotExistException& ex)
-        {
-            mNonExistent.push_back(b);
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
-        catch(const Ice::Exception& ex)
-        {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
     }
 
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+protected:
+    bool executeImpl()
     {
-        return mNonExistent;
+        bool tasksDispatched = false;
+        for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+        {
+            SessionWrapperPtr session = mSessionManager->addSession(*i);
+            if (session == 0)
+            {
+                //
+                // This shouldn't happen!
+                //
+                mTracker->addExceptionMessage(*i, "session already added");
+                continue;
+            }
+            tasksDispatched = true;
+            (*i)->begin_setBridge(mBridge, mSessionListener,
+                    newCallback_Session_setBridge(this, &SetBridgeTask::set,
+                            &SetBridgeTask::failed), session);
+        }
+        return !tasksDispatched;
     }
-private:
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
 
-class HoldImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
     {
-        try
+        mTracker->add(session->getSession());
+        if (info->currentState != "ready")
         {
-            b->getSession()->hold();
-        }
-        catch(const Ice::ObjectNotExistException& ex)
-        {
-            mNonExistent.push_back(b);
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+            //
+            // setupMedia is an AMI backed implementation, so should not block here.
+            //
+            session->setupMedia();
         }
-        catch(const Ice::Exception& ex)
+        if (mTracker->responseCount() == mSessions.size())
         {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+            mListener->succeeded();
         }
     }
 
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
-    {
-        return mNonExistent;
-    }
-private:
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class UnholdImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
-    void operator()(const BridgeImpl::BridgeSessionPtr& b)
+    void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
     {
+        //
+        // TODO:
+        // * Log exception.
+        // * Rollback
+        // * Interpret exception and decide whether or not to fail the entire operations.
+        // Currently the semantics allow some operations to fail.
+        //
+        mTracker->addException(session->getSession(), ex);
         try
         {
-            b->getSession()->unhold();
+            //
+            // We want to make sure that session is laying around. The session collection will
+            // take care of cleaning it up as long as it is marked as destroyed.
+            //
+            session->destroy();
         }
-        catch(const Ice::ObjectNotExistException& ex)
+        catch(...)
         {
-            mNonExistent.push_back(b);
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
         }
-        catch(const Ice::Exception& ex)
+        if (mTracker->responseCount() == mSessions.size())
         {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-        }
-    }
-
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
-    {
-        return mNonExistent;
-    }
-private:
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class ConnectImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
-    ConnectImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
-        mExclude(exclude)
-    {
-    }
-
-    void operator()(BridgeImpl::BridgeSessionPtr& b)
-    {
-        if(b->getSession() != mExclude)
-        {
-            try
-            {
-                b->connect();
-            }
-            catch(const Ice::ObjectNotExistException& ex)
+            SessionErrorSeq exceptions = mTracker->getExceptions();
+            if (exceptions.size() == mSessions.size())
             {
-                mNonExistent.push_back(b);
-                lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+                mListener->failed();
             }
-            catch(const Ice::Exception& ex)
+            else
             {
-                lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+                mListener->succeeded();
             }
         }
     }
-
-    const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
-    {
-        return mNonExistent;
-    }
-
+    
 private:
-    AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
-    std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+    SessionCollectionPtr mSessionManager;
+    BridgePrx mBridge;
+    SessionListenerPrx mSessionListener;
+    SessionSeq mSessions;
+    SessionsTrackerPtr mTracker;
 };
 
-class FindImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, bool>
+class AddToListeners : public QueuedTask
 {
 public:
-    FindImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx) :
-        mPrx(prx)
+    AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+        QueuedTask("AddToListeners"),
+        mListeners(listeners),
+        mTracker(tracker)
     {
     }
-
-    bool operator()(const BridgeImpl::BridgeSessionPtr& b)
+    
+protected:
+    bool executeImpl()
     {
-        return b->getSession() == mPrx;
+        mListeners->sessionsAdded(mTracker->getSessions());
+        return true;
     }
+    
 private:
-    AsteriskSCF::SessionCommunications::V1::SessionPrx mPrx;
+    BridgeListenerMgrPtr mListeners;
+    SessionsTrackerPtr mTracker;
 };
 
-
-//
-// For events that require modification to the bridge, we use helper methods on the bridge itself.
-// For events result in distribution to the bridge sessions, we copy the current sessions and
-// run the calls from the listener itself.
-//
-class SessionListener : public AsteriskSCF::SessionCommunications::V1::SessionListener
+class CheckShutdown : public QueuedTask
 {
 public:
-    SessionListener(const BridgeImplPtr& b) :
-        mBridge(b)
+    CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+        QueuedTask("CheckShutdown"),
+        mBridge(bridge),
+        mPrx(proxy)
     {
     }
 
-    void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+protected:
+    bool executeImpl()
     {
-        try
-        {
-            mBridge->sessionConnected(source);
-        }
-        catch(const Ice::Exception& ex)
+        if (mBridge->sessions()->size() < 2 && mPrx)
         {
-            lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
-            throw;
+            mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+                            &CheckShutdown::failed));
         }
-        std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
-        std::for_each(sessions.begin(), sessions.end(), ConnectImpl(source));
+        //
+        // We don't care about the result really. The CheckShutdown instance will hang
+        // around because of the AMI request so the completion of the request will not
+        // have an issue.
+        //
+        return true;
     }
 
-    void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+    void done()
     {
+        //
+        // We don't care about the ending.
+        //
     }
 
-    void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+    void failed(const Ice::Exception&)
     {
+        //
+        // Operationally, we don't really care but we should probably log
+        //
     }
+    
+private:
+    BridgeImplPtr mBridge;
+    BridgePrx mPrx;
+};
 
-    void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, 
-      const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&)
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+    GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+        QueuedTask("GenericAMDCallback"),
+        mCallback(cb),
+        mTracker(tracker)
     {
     }
+protected:
 
-    void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+    bool executeImpl()
     {
-        std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
-        if(sessions.size() > 0)
-        {
-            std::for_each(sessions.begin(), sessions.end(), RingImpl(source));
-        }
+        mCallback->ice_response();
+        return true;
     }
 
-    void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, 
-      const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+    void failImpl()
     {
-        size_t endpointCount = mBridge->sessionStopped(source, response);
-        if(endpointCount < 2)
+        SessionErrorSeq errors(mTracker->getExceptions());
+        if (!errors.empty())
         {
-            mBridge->spawnShutdown();
+            mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+        }
+        else
+        {
+            mCallback->ice_exception();
         }
     }
 
-    void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+private:
+    T mCallback;
+    SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+    UpdateTask(const BridgeImplPtr& bridge) :
+        QueuedTask("UpdateTask"),
+        mBridge(bridge)
     {
     }
 
+protected:
+    bool executeImpl()
+    {
+        mBridge->forceUpdate();
+        return true;
+    }
 private:
     BridgeImplPtr mBridge;
 };
 
-} // End of namespace BridgeService
-} // End of namespace AsteriskSCF
-
-AsteriskSCF::BridgeService::BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter, 
-  const std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>& listeners, 
-  const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr, 
-  const AsteriskSCF::SessionCommunications::V1::BridgePrx& prx) :
-    mState(Running),
+} // End of anonymous namespace
+
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
+        const vector<BridgeListenerPrx>& listeners, 
+        const BridgeListenerMgrPtr& listenerMgr,
+        const ReplicatorSmartPrx& replicator,
+        const BridgeStateItemPtr& state,
+        const Logger& logger) :
+    mActivated(false),
+    mState(state),
+    mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+    mName(name),
     mObjAdapter(adapter),
     mListeners(listenerMgr),
-    mSessionListener(new SessionListener(this)),
-    mPrx(prx)
+    mReplicator(replicator),
+    mSessionListener(createSessionListener(mSessions, logger)),
+    mLogger(logger)
 {
-    for(std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>::const_iterator i = listeners.begin(); 
+    mLogger(Debug) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
+    for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin(); 
       i != listeners.end(); ++i)
     {
         mListeners->addListener(*i);
     }
-    std::string listenerId = mObjAdapter->getCommunicator()->identityToString(prx->ice_getIdentity());
-    listenerId += ".sessionListener";
-    mSessionListenerPrx =
-        AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(
-            mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
-            );
 }
 
-AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
+BridgeImpl::~BridgeImpl()
 {
     //
     // TODO: Determine if we need to clean up the listener manager. We may not
@@ -459,525 +522,507 @@ AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
     //
 }
 
-void AsteriskSCF::BridgeService::BridgeImpl::addSessions(
-  const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
+void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions,
+        const Ice::Current& current)
 {
-    if(sessions.size() == 0)
-    {
-        return;
-    }
-    checkSessions(sessions);
-    AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
+    try
     {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        statePreCheck();
-        lg(Debug) << __FUNCTION__ << ": adding " << sessions.size() << " sessions to " 
-            << current.adapter->getCommunicator()->identityToString(current.id);
-        for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
-            i != sessions.end(); ++i)
+        if (sessions.empty())
         {
-            //
-            // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
-            // impossible to guard against race conditions where multiple call paths might want to add a session
-            // more than once for some reason. We should probably just log it and move on.
-            //
-            std::vector<BridgeSessionPtr>::iterator j = 
-                find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
-            if(j != mSessions.end())
-            {
-                lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() 
-                    << " is already registered with this bridge.";
-                continue;
-            }
-
-            AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info;
-            try
+            if (callback)
             {
-                RetryPolicy policy(5, 500);
-                //
-                // canRetry should never return false since we throw ourselves out of this loop. But
-                // we'll do it here in case we decide to do something else.
-                //
-                while(policy.canRetry())
-                {
-                    try
-                    {
-                        info = (*i)->setBridge(mPrx, mSessionListenerPrx);
-                        break;
-                    }
-                    catch(const Ice::ConnectionLostException&)
-                    {
-                        if(!policy.retry())
-                        {
-                            throw;
-                        }
-                    }
-                }
+                callback->ice_response();
             }
-            catch(const Ice::Exception& ex)
-            {
-                lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() << " threw " << ex.what() 
-                    << " continuing";
-            }
-            //
-            // We need to define these states! Especially the ones that define when start is called or not.
-            //
-            if(info->currentState == "ready")
-            {
-                lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() 
-                    << " current state is ready (not yet connected), not establishing media connections.";
-                mSessions.push_back(new BridgeSession(*i, 0, false));
-            }
-            else
-            {
-                lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() 
-                    << " media is expected to be establishing, plugging media into bridge.";
-                mSessions.push_back(new BridgeSession(*i, mSplicer.connect(*i), false));;
-            }
-
-            addedSessions.push_back(*i);
+            return;
         }
+        checkSessions(sessions);
+        statePreCheck();
+        mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
+
+        SessionsTrackerPtr tracker(new SessionsTracker);
+        QueuedTasks tasks;
+        tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+        tasks.push_back(new AddToListeners(mListeners, tracker));
+        tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+        tasks.push_back(new UpdateTask(this));
+        ExecutorPtr executor(new Executor(tasks, mLogger));
+        executor->start();
+        //
+        // When the operations have all completed, that last task will take care of handling
+        // the callback. It's all left withing the try/catch in the event something happens during
+        // task startup.
+        //
     }
-    if(addedSessions.size())
+    catch (const std::exception& ex)
     {
-        mListeners->sessionsAdded(addedSessions);
+        callback->ice_exception(ex);
+    }
+    catch (...)
+    {
+        callback->ice_exception();
     }
 }
 
-void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(
-  const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current&)
+void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+        const Ice::Current&)
 {
-    if(sessions.size() == 0)
-    {
-        return;
-    }
-    checkSessions(sessions);
-    AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
+    try
     {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (sessions.empty())
+        {
+            callback->ice_response();
+            return;
+        }
+        checkSessions(sessions);
         statePreCheck();
-        for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin(); 
-          i != sessions.end(); ++i)
+
+        //
+        // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+        // we allow them to proceed asynchronously and do not concern ourselves with the result.
+        // The logic of shutdown should remove them either because the operations succeeded or
+        // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+        // is pointless.
+        //
+        SessionsTrackerPtr removed(new SessionsTracker);
+        for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
         {
-            std::vector<BridgeSessionPtr>::iterator j = std::find_if(mSessions.begin(), 
-              mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
-            if(j != mSessions.end())
+            SessionWrapperPtr session = mSessions->getSession(*i);
+            if (session)
             {
-                try
-                {
-                    (*j)->getSession()->removeBridge(mSessionListenerPrx);
-                }
-                catch(const Ice::Exception& ex)
-                {
-                    lg(Info) << __FUNCTION__ << ": removingthe bridge from " << (*j)->getSession() << " threw " 
-                        << ex.what();
-                }
-                (*j)->disconnect();
-                mSessions.erase(j);
... 8618 lines suppressed ...


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list