[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridge-replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Mar 3 13:20:57 CST 2011


branch "bridge-replication" has been updated
       via  007048196f9c9f1f19f35126961804a9928e4312 (commit)
       via  d44c4a263724b7ce6900103bf24dcc0b89a0bd8f (commit)
      from  381d74f1d554b08eaa22db45062fd709b35a4ce5 (commit)

Summary of changes:
 config/test_bridging.conf.in           |   20 ++-
 src/BridgeImpl.cpp                     |    9 +-
 src/BridgeReplicatorIf.ice             |   12 +-
 src/BridgeReplicatorStateListenerI.cpp |   42 +++++-
 src/DebugUtil.h                        |   17 --
 src/MediaSplicer.cpp                   |  253 +++++++++++++++++++++++++++-----
 src/MediaSplicer.h                     |   19 ++-
 src/Service.cpp                        |    2 +-
 src/SessionCollection.cpp              |   12 +-
 src/SessionCollection.h                |    3 +-
 src/SessionListener.cpp                |    4 +-
 src/SessionWrapper.cpp                 |   34 ++++-
 src/SessionWrapper.h                   |    5 +-
 test/TestBridging.cpp                  |    3 +-
 test/UnitTests.cpp                     |   42 +++---
 15 files changed, 355 insertions(+), 122 deletions(-)


- Log -----------------------------------------------------------------
commit 007048196f9c9f1f19f35126961804a9928e4312
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Mar 3 15:50:30 2011 -0330

    Remove some cruft (debug statements, commented out code)

diff --git a/src/DebugUtil.h b/src/DebugUtil.h
index b691050..ba06db1 100644
--- a/src/DebugUtil.h
+++ b/src/DebugUtil.h
@@ -54,23 +54,6 @@ std::ostream& dumpState(std::ostream& os, const std::string& prefix,
         default:
             os << "(invalid)\n";
     }
-#if 0
-    //
-    // XXX: media pairings are probably going to occur elsewhere, update when that's decided.
-    //
-    int index = 0;
-    for (AsteriskSCF::Bridge::V1::MediaPairingSeq::const_iterator i = session->mediaPairings.begin();
-         i != session->mediaPairings.end(); ++i)
-    {
-        if (index == 0)
-        {
-            os << prefix << "Pairings:\n";
-        }
-        os << prefix << "\t" << index << ". source: " << comm->identityToString((*i)->source->ice_getIdentity()) << " to sink : " <<
-            comm->identityToString((*i)->sink->ice_getIdentity()) << '\n';
-        ++index;
-    }
-#endif
         
     return os;
 #else
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 2ca5c88..7ae5176 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -652,8 +652,6 @@ public:
                 BridgeSeq bridges = mgrPrx2->listBridges();
                 BridgeSeq bridges2 = mgrPrx->listBridges();
                 BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
-                cout << __LINE__ << " XXX " << bridges2.size() << endl;
-                cout << __LINE__ << " XXX should be " << bridges.size() << endl;
                 mgrPrx->addListener(listenerPrx);
                 bridge = mgrPrx->createBridge(sessions, 0);
                 servant->wait(5000);

commit d44c4a263724b7ce6900103bf24dcc0b89a0bd8f
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Mar 3 15:44:13 2011 -0330

    - Fix some cleanup bugs on finalized state.
    - Fix media pairing replication.
    - Refactored the MediaSplicer object out of the Bridge Object. Now belongs
      the SessionCollection.
    - Added some addditional tracing.
    - Fixed a test suite bug.
    - Fixed a race condition where an externally initiated connected callback
      can cause a disconnected session to be reborn (disconnection is a
      terminal state for bridged sessions)
    - Updated test configuration file.

diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 8b16e1b..461f245 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -1,22 +1,27 @@
 AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
 AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 55555
+AsteriskSCFIceStorm.TopicManager.ThreadPool.Size=4
 AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
+AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
 AsteriskSCFIceStorm.Transient=1
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
 
 LoggerAdapter.Endpoints=default
 AsteriskSCF.LoggingService.Endpoints=default
+AsteriskSCF.LoggingService.ThreadPool.Size=4
 AsteriskSCF.LoggingClient.Endpoints=default
+AsteriskSCF.LoggingClient.ThreadPool.Size=4
 AsteriskSCF.TestChannelService.Endpoints=default -p 55560
+AsteriskSCF.TestChannelService.ThreadPool.Size=4
 
-LocatorService.Proxy=LocatorService:default -p 55558
 
 ServiceLocatorManagementAdapter.Endpoints=tcp -p 55557
+ServiceLocatorManagementAdapter.ThreadPool.Size=4
 ServiceLocatorAdapter.Endpoints=tcp -p 55558
-
+ServiceLocatorAdapter.ThreadPool.Size=4
+LocatorService.Proxy=LocatorService:default -p 55558
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 55557
 
-TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
-
 IceBox.InheritProperties=1
 IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
 IceBox.Service.Replicator=../src at BridgeReplicator:create
@@ -25,7 +30,7 @@ IceBox.Service.TestBridge2=../src/@bridgeservice:create
 IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
 IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
 IceBox.Service.TestDriver=../test at bridge_component_test:create
-
+ 
 TestBridge.InstanceName=TestBridge
 TestBridge.BridgeService.Endpoints=default -p 55561
 TestBridge.BridgeService.ThreadPool.Size=4
@@ -39,6 +44,7 @@ TestBridge2.BridgeService.ThreadPool.Size=4
 TestBridge2.Proxy=BridgeManager:default -p 55572
 TestBridge2.BridgeServiceInternal.Endpoints=default -p 65462
 TestBridge2.BridgeServiceInternal.ThreadPool.Size=4
+TestBridge2.StateReplicatorListener=yes
 
 Replicator.InstanceName=Replicator
 Replicator.BridgeReplicator.Endpoints=default -p 63242
@@ -47,9 +53,11 @@ Replicator.BridgeReplicator.ThreadPool.Size=4
 
 TestChannel.Proxy=TestChannel:default -p 55560
 TestUtilAdapter.Endpoints=default -p 55562
+TestUtilAdapter.ThreadPool.Size=4
 Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
 
-IceBox.LoadOrder=TestServiceLocator Logger TestBridge TestChannel TestDriver
+IceBox.LoadOrder=TestServiceLocator Logger Replicator TestBridge TestBridge2 TestChannel TestDriver
 
 IceBox.ServiceManager.Endpoints=default -p 56000
+IceBox.ServiceManager.ThreadPool.Size=4
 IceBoxMgr.Proxy=IceBox/ServiceManager:default -p 56000
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index d647416..4dd7969 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -126,11 +126,6 @@ private:
     //
     SessionCollectionPtr mSessions;
 
-    //
-    // TODO: Move this out.. this doesn't belong with the bridge implementation any longer.
-    // 
-    MediaSplicer mSplicer;
-    
     const string mName;
     Ice::ObjectAdapterPtr mObjAdapter;
 
@@ -289,7 +284,7 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
                     // We setup media.
                     // TODO: AMI should come into play here.
                     //
-                    session->setConnector(mSplicer.connect(*i, mLogger));
+                    session->setupMedia();
                 }
             }
         }
@@ -551,7 +546,7 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
                         // We setup media.
                         // TODO: AMI should come into play here.
                         //
-                        session->setConnector(mSplicer.connect(*i, mLogger));
+                        session->setupMedia();
                     }
                     added.push_back(*i);
                     
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index e31c153..691f0e2 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -89,7 +89,7 @@ enum MediaOperationReplicationPolicy
  * to media sources and sinks in the bridge.
  *
  **/
-class MediaPairing extends ReplicatedStateItem
+class MediaPairing
 {
     AsteriskSCF::Media::V1::StreamSource* source;
     AsteriskSCF::Media::V1::StreamSink* sink;
@@ -104,10 +104,10 @@ sequence<MediaPairing> MediaPairingSeq;
 class SessionPairing extends ReplicatedStateItem
 {
     string bridgeKey;
-    string sessionKeyA;
-    string sessionKeyB;
-
-    MediaPairingSeq mediaPairings;
+    string sessionKey;
+    AsteriskSCF::Media::V1::Session* mediaSession;
+    MediaPairingSeq incomingMediaPairings;
+    MediaPairingSeq outgoingMediaPairings;
 };
 
 /**
@@ -144,7 +144,7 @@ class BridgedSession extends ReplicatedStateItem
     BridgedSessionState currentState;
 
     /**
-     * Key to the child session pairing
+     * Key to the child session pairing. TODO!
      **/
     string sessionPairingKey;
 
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index c5060d0..29e5d2c 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -45,10 +45,13 @@ public:
     {
         for (Ice::StringSeq::const_iterator k = itemKeys.begin(); k != itemKeys.end(); ++k)
         {
+
             map<string, ReplicatedStateItemPtr>::iterator entry =  mItems.find((*k));
             if (entry != mItems.end())
             {
                 ReplicatedStateItemPtr item = entry->second;
+                mLogger(Debug) << " received removal of " << (*k) << ": a " << item->ice_id();
+
                 mItems.erase(entry);
                 BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast(item);
                 if (bridgedSessionItem)
@@ -98,6 +101,10 @@ public:
                     continue;
                 }
 
+                //
+                // Session pairings are cleaned up by way of sessions going away.
+                //
+
                 ///
                 // The bridge manager isn't really removable.
                 //
@@ -109,6 +116,8 @@ public:
     {
         for (ReplicatedStateItemSeq::const_iterator i = items.begin(); i != items.end(); ++i)
         {
+            mLogger(Debug) << " received update " << (*i)->serial << " for " << (*i)->key << " (a " <<
+                (*i)->ice_id() << ")";
             map<string, ReplicatedStateItemPtr>::iterator entry =  mItems.find((*i)->key);
             ReplicatedStateItemPtr existingItem;
             if (entry != mItems.end())
@@ -207,7 +216,38 @@ public:
              
                 continue;
             }
-
+            
+            SessionPairingPtr sessionPairing = SessionPairingPtr::dynamicCast((*i));
+            if (sessionPairing)
+            {
+                vector<BridgeServantPtr> bridges = mManager->getBridges();
+                bool found = false;
+                for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+                {
+                    if ((*b) && (*b)->id() == sessionPairing->bridgeKey)
+                    {
+                        SessionWrapperPtr session = (*b)->sessions()->getSession(sessionPairing->sessionKey);
+                        if (session)
+                        {
+                            session->updateMedia(sessionPairing);
+                        }
+                        //
+                        // Keep the session list clean.
+                        //
+                        found = true;
+                    }
+                    //
+                    // We could break here if we could be sure that there were no other updates.
+                    //
+                }
+                if (!found)
+                {
+                    mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+                }
+             
+                continue;
+            }
+            
             BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast((*i));
             if (bridgeListener)
             {
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index fe49d5d..3268d98 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -17,11 +17,16 @@
 #include <IceUtil/Shared.h>
 #include <IceUtil/Handle.h>
 #include "BridgeServiceConfig.h"
+#include "BridgeReplicatorIf.h"
+#include <string>
+#include <vector>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::Media::V1;
+using namespace AsteriskSCF::Bridge::V1;
 using namespace std;
 
+
 namespace AsteriskSCF
 {
 namespace BridgeService
@@ -32,8 +37,8 @@ namespace BridgeService
 class MediaSplicerI;
 class MedixMixerPtr;
 
-typedef pair<StreamSinkPrx, StreamSourcePrx> OutgoingPairing;
-typedef pair<StreamSourcePrx, StreamSinkPrx> IncomingPairing;
+typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
+typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
 
 //
 // TODO: These proxies could use some retry properties added to them...
@@ -43,18 +48,27 @@ typedef pair<StreamSourcePrx, StreamSinkPrx> IncomingPairing;
 class MediaConnectorI : public MediaConnector
 {
 public:
-
-    MediaConnectorI(const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming,
-        const MediaConnectorPtr& peer,
-        const Logger& logger):
+    MediaConnectorI(const AsteriskSCF::Media::V1::SessionPrx& media,
+            const vector<OutgoingPairing>& outgoing, const vector<IncomingPairing>& incoming,
+            const MediaConnectorPtr& peer,
+            const string& bridgeId,
+            const string& sessionId,
+            const ReplicatorSmartPrx& replicator,
+            const Logger& logger):
+        mMedia(media),
         mOutgoing(outgoing),
         mIncoming(incoming),
         mPeer(peer),
         mConnected(true),
+        mBridgeId(bridgeId),
+        mSessionId(sessionId),
+        mRepCounter(SerialCounterStart),
+        mReplicator(replicator),
         mLogger(logger)
     {
         mLogger(Debug) << FUNLOG << ": setting up a media connector with " << outgoing.size()
                        << " outgoing pairings and " << incoming.size() << " incoming pairings.";
+        mKey = mBridgeId + mSessionId + "media";
         for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
         {
             i->second->setSink(i->first);
@@ -67,6 +81,29 @@ public:
         }
     }
 
+    MediaConnectorI(const SessionPairingPtr& pairing,
+            const MediaConnectorPtr& peer,
+            const ReplicatorSmartPrx& replicator,
+            const Logger& logger) :
+        mMedia(pairing->mediaSession),
+        mPeer(peer),
+        mConnected(true),
+        mBridgeId(pairing->bridgeKey),
+        mSessionId(pairing->sessionKey),
+        mRepCounter(pairing->serial),
+        mReplicator(replicator),
+        mLogger(logger)
+    {
+        for (MediaPairingSeq::iterator i = pairing->outgoingMediaPairings.begin(); i != pairing->outgoingMediaPairings.end(); ++i)
+        {
+            mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
+        }
+        for (MediaPairingSeq::iterator i = pairing->incomingMediaPairings.begin(); i != pairing->incomingMediaPairings.end(); ++i)
+        {
+            mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
+        }
+    }
+
     void unplug()
     {
         mLogger(Debug) << FUNLOG << ": called.";
@@ -76,7 +113,7 @@ public:
             mLogger(Debug) << FUNLOG << ": we did not have any media. Contacting the peer connector and telling it to disconnect!";
             MediaConnectorPtr peer;
             {
-                IceUtil::Mutex::Lock lock(mMutex);
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
                 peer = mPeer;
                 mPeer = 0;
             }
@@ -90,7 +127,7 @@ public:
             mLogger(Debug) << FUNLOG << ": media connections unplugged. Let's forget about our peer connector now!";
             MediaConnectorPtr peer;
             {
-                IceUtil::Mutex::Lock lock(mMutex);
+                boost::unique_lock<boost::shared_mutex> lock(mLock);
                 peer = mPeer;
                 mPeer = 0;
             }
@@ -101,18 +138,31 @@ public:
             mPeer = 0;
         }
         mLogger(Debug) << FUNLOG << ": finished unplugging.";
+        if (mReplicator)
+        {
+            try
+            {
+                Ice::StringSeq keys;
+                keys.push_back(mKey);
+                mReplicator->removeState(keys);
+            }
+            catch (const std::exception& ex)
+            {
+                mLogger(Debug) << FUNLOG << ": exception " << ex.what();
+            }
+        }
     }
 
     bool isConnected()
     {
-        IceUtil::Mutex::Lock lock(mMutex);
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
         return mConnected;
     }
 
     void clearConnections()
     {
         mLogger(Debug) << FUNLOG << ": clearing connections.";
-        IceUtil::Mutex::Lock lock(mMutex);
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
         mOutgoing.clear();
         mIncoming.clear();
     }
@@ -120,10 +170,76 @@ public:
     void update(const MediaConnectorPtr& peer, const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming)
     {
         mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
-        IceUtil::Mutex::Lock lock(mMutex);
-        mPeer = peer;
-        mOutgoing = outgoing;
-        mIncoming = incoming;
+        SessionPairingPtr update;
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            mPeer = peer;
+            mOutgoing = outgoing;
+            mIncoming = incoming;
+            update = createUpdate();
+        }
+        pushUpdate(update);
+    }
+
+    SessionPairingPtr createUpdate()
+    {
+        if (mReplicator)
+        {
+            SessionPairingPtr update(new SessionPairing);
+            update->mediaSession = mMedia;
+            update->bridgeKey = mBridgeId;
+            update->sessionKey = mSessionId;
+            update->key = mKey;
+            update->serial = ++mRepCounter;
+            for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
+            {
+                update->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
+            }
+            for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
+            {
+                update->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
+            }
+            return update;
+        } 
+        return 0;       
+    }
+
+    void pushUpdate(const SessionPairingPtr& update)
+    {
+        if (mReplicator && update)
+        {
+            try
+            {
+                ReplicatedStateItemSeq seq;
+                seq.push_back(update);
+                mReplicator->setState(seq);
+            }
+            catch (const std::exception& ex)
+            {
+                mLogger(Debug) << ": exception " << ex.what();
+            }
+        }
+    }
+
+    void initialUpdate()
+    {
+        pushUpdate(createUpdate());
+    }
+
+    void update(const SessionPairingPtr& update)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mRepCounter = update->serial;
+        mOutgoing.clear();
+        mIncoming.clear();
+        for (MediaPairingSeq::iterator i = update->outgoingMediaPairings.begin(); i != update->outgoingMediaPairings.end(); ++i)
+        {
+            mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
+        }
+        for (MediaPairingSeq::iterator i = update->incomingMediaPairings.begin(); i != update->incomingMediaPairings.end(); ++i)
+        {
+            mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
+        }
     }
 
     bool disconnectMedia()
@@ -131,14 +247,17 @@ public:
         mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
         vector<OutgoingPairing> outgoing;
         vector<IncomingPairing> incoming;
+        SessionPairingPtr update;
         {
-            IceUtil::Mutex::Lock lock(mMutex);
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
             outgoing = mOutgoing;
             mOutgoing.clear();
             incoming = mIncoming;
             mIncoming.clear();
             mConnected = false;
+            update = createUpdate();
         }
+        pushUpdate(update);
         if (outgoing.size() == 0 && incoming.size() == 0)
         {
             return false;
@@ -193,12 +312,23 @@ public:
         return true;
     }
 
+    string id()
+    {
+        return mKey;
+    }
+
 private:
-    IceUtil::Mutex mMutex;
+    boost::shared_mutex mLock;
+    AsteriskSCF::Media::V1::SessionPrx mMedia;
     vector<OutgoingPairing> mOutgoing;
     vector<IncomingPairing> mIncoming;
     MediaConnectorPtr mPeer;
     bool mConnected;
+    string mBridgeId;
+    string mSessionId;
+    string mKey;
+    long mRepCounter;
+    ReplicatorSmartPrx mReplicator;
     Logger mLogger;
 };
 
@@ -212,18 +342,25 @@ class MediaSplicerI : public IceUtil::Shared
 
 public:
 
-    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
-      const Logger& logger)
+    MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
+        mCommunicator(communicator),
+        mBridgeId(bridgeId),
+        mReplicator(replicator),
+        mLogger(logger)
+    {
+    }
+
+    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
     {
-        SessionPrx media = session->getMediaSession();
+        AsteriskSCF::Media::V1::SessionPrx media = session->getMediaSession();
         if (!media)
         {
-            logger(Debug) << FUNLOG << ": no media available!";
+            mLogger(Debug) << FUNLOG << ": no media available!";
             return 0;
         }
 
-        IceUtil::Mutex::Lock lock(mMutex);
-        MediaConnectorPtr result;
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        MediaConnectorIPtr result;
 
         //
         // Loop through looking for an identical media session, also reaping disconnected connectors as we go.
@@ -233,20 +370,21 @@ public:
         {
             if (!i->connector->isConnected())
             {
-                logger(Debug) << FUNLOG << ": reaping a connector";
+                mLogger(Debug) << FUNLOG << ": reaping a connector";
                 MediaSessions::iterator t = i;
                 i = mSessions.erase(t);
                 continue;
             }
             if (i->mediaSession == media)
             {
-                result = i->connector;
+                result = MediaConnectorIPtr::dynamicCast(i->connector);
+                assert(result);
             }
             ++i;
         }
         if (result)
         {
-            logger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
+            mLogger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
             return result;
         }
 
@@ -273,10 +411,11 @@ public:
         vector<OutgoingPairing> outgoingPairings(findCompatiblePairings(sinks));
         vector<IncomingPairing> incomingPairings(findCompatiblePairings(sources));
 
-
-        result = new MediaConnectorI(outgoingPairings, incomingPairings, existing, logger);
-        logger(Debug) << FUNLOG << ": established connections, returning connector object";
+        string sessionId = mCommunicator->identityToString(session->ice_getIdentity());
+        result = new MediaConnectorI(media, outgoingPairings, incomingPairings, existing, mBridgeId, sessionId, mReplicator, mLogger);
+        mLogger(Debug) << FUNLOG << ": established connections, returning connector object";
         mSessions.push_back(MediaSessionStruct(media, result));
+        result->initialUpdate();
 
         //
         // PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
@@ -290,9 +429,42 @@ public:
         return result;
     }
 
+    MediaConnectorPtr createReplica(const SessionPairingPtr& pairings)
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        MediaConnectorIPtr result;
+
+        MediaSessions::iterator i = mSessions.begin();
+        while (i != mSessions.end())
+        {
+            if (!i->connector->isConnected())
+            {
+                mLogger(Debug) << FUNLOG << ": reaping a connector";
+                MediaSessions::iterator t = i;
+                i = mSessions.erase(t);
+                continue;
+            }
+            ++i;
+        }
+
+        MediaConnectorPtr existing;
+        if (mSessions.size() == 1)
+        {
+            existing = mSessions.back().connector;
+            existing->clearConnections();
+        }
+        result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
+        mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+        return result;
+    }
+
 private:
 
-    IceUtil::Mutex mMutex;
+    boost::shared_mutex mLock;
+    Ice::CommunicatorPtr mCommunicator;
+    string mBridgeId;
+    ReplicatorSmartPrx mReplicator;
+    Logger mLogger;
 
     struct MediaSessionStruct
     {
@@ -436,24 +608,27 @@ private:
     }
 };
 
-typedef IceUtil::Handle<MediaSplicerI> MediaSplicerPtr;
-
 } // End of namespace BridgeService
 } // End of namespace AsteriskSCF
 
-AsteriskSCF::BridgeService::MediaSplicer::MediaSplicer() :
-    mImpl(new MediaSplicerI)
+using namespace AsteriskSCF::BridgeService;
+
+MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
+        const Logger& logger) :
+    mImpl(new MediaSplicerI(comm, bridgeId, replicator, logger))
+{
+}
+
+MediaSplicer::~MediaSplicer()
 {
 }
 
-AsteriskSCF::BridgeService::MediaSplicer::~MediaSplicer()
+MediaConnectorPtr MediaSplicer::connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
 {
-    delete mImpl;
+    return mImpl->connect(session);
 }
 
-AsteriskSCF::BridgeService::MediaConnectorPtr AsteriskSCF::BridgeService::MediaSplicer::connect(
-    const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
-    const AsteriskSCF::System::Logging::Logger& logger)
+MediaConnectorPtr MediaSplicer::createReplica(const SessionPairingPtr& pairings)
 {
-    return mImpl->connect(session, logger);
+    return mImpl->createReplica(pairings);
 }
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 776c3b5..08443d8 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -20,6 +20,8 @@
 #include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
 #include <AsteriskSCF/logger.h>
 #include <memory>
+#include "BridgeServiceConfig.h"
+#include "BridgeReplicatorIf.h"
 
 namespace AsteriskSCF
 {
@@ -33,22 +35,29 @@ public:
     virtual bool isConnected() = 0;
     virtual void clearConnections() = 0;
     virtual bool disconnectMedia() = 0;
+
+    virtual void update(const AsteriskSCF::Bridge::V1::SessionPairingPtr& update) = 0;
 };
 
 typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
 
 class MediaSplicerI;
 
-class MediaSplicer
+class MediaSplicer : public IceUtil::Shared
 {
 public:
-    MediaSplicer();
+    MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
+            const AsteriskSCF::System::Logging::Logger& logger);
     ~MediaSplicer();
-    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session,
-      const AsteriskSCF::System::Logging::Logger& logger);
+    MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
+
+    MediaConnectorPtr createReplica(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
 
 private:
-    MediaSplicerI* mImpl;
+    boost::shared_ptr<MediaSplicerI> mImpl;
 };
+
+typedef IceUtil::Handle<MediaSplicer> MediaSplicerPtr;
+
 } // End of namespace BridgeService
 } // End of namespace AsteriskSCF
diff --git a/src/Service.cpp b/src/Service.cpp
index 078ae7a..660f18b 100644
--- a/src/Service.cpp
+++ b/src/Service.cpp
@@ -213,7 +213,7 @@ void BridgingApp::start(const std::string& name, const Ice::CommunicatorPtr& com
     bool onStandby = false;
     if (replicator)
     {
-        onStandby = communicator->getProperties()->getPropertyWithDefault(adapterName + "StateReplicatorListener", "no") == "yes";
+        onStandby = communicator->getProperties()->getPropertyWithDefault(name + ".StateReplicatorListener", "no") == "yes";
     }
     if (onStandby)
     {
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index cfd8ac7..e59e6e6 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -40,7 +40,8 @@ SessionCollection::SessionCollection(const Ice::CommunicatorPtr& comm, const str
     mReplicator(replicator),
     mLogger(logger),
     mBridgeId(bridgeId),
-    mSessionCounter(0)
+    mSessionCounter(0),
+    mSplicer(new MediaSplicer(comm, bridgeId, replicator, logger))
 {
 }
  
@@ -90,7 +91,7 @@ SessionWrapperPtr SessionCollection::addSession(const SessionPrx& session)
     bridgedSession->bridgeId = mBridgeId;
     bridgedSession->orderAdded = mSessionCounter;
     
-    SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mReplicator, mLogger));
+    SessionWrapperPtr newSession(new SessionWrapper(bridgedSession, mSplicer,  mReplicator, mLogger));
     mMap[key] = newSession;
     ++mSessionCounter;
     return newSession;
@@ -125,7 +126,10 @@ SessionSeq SessionCollection::getSessionSeq()
     IfInCriteria<set<BridgedSessionState>, StateMemberSelector> active(states);
     SelectOperation< IfInCriteria<set<BridgedSessionState>, StateMemberSelector>, SessionPrxSelector, SessionSeq>
         selectIf(active);
-    return visitSessions(selectIf).results();
+    visitSessions(selectIf);
+    SessionSeq results(selectIf.results());
+    mLogger(Debug) << FUNLOG << " " << results.size();
+    return results;
 }
 
 size_t SessionCollection::size()
@@ -183,7 +187,7 @@ void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
         SessionMap::iterator i = mMap.find(session->key);
         if (i == mMap.end())
         {
-            mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+            mMap[session->key] = new SessionWrapper(session, mSplicer, mReplicator, mLogger);
         }
         else
         {
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index a93f7c2..cf95815 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -22,6 +22,7 @@
 #include <string>
 #include "BridgeServiceConfig.h"
 #include "SessionWrapper.h"
+#include "MediaSplicer.h"
 
 namespace AsteriskSCF
 {
@@ -129,7 +130,7 @@ private:
     AsteriskSCF::System::Logging::Logger mLogger;
     std::string mBridgeId;
     long mSessionCounter;
-
+    MediaSplicerPtr mSplicer;
 };
 
 typedef IceUtil::Handle<SessionCollection> SessionCollectionPtr;
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 3f0312b..280c49e 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -57,10 +57,8 @@ public:
                 return;
             }
 
-            //
-            // XXX- media pairings.
-            //
             session->setConnected();
+            session->setupMedia();
             ConnectSessionOperation connector(source, mLogger);
             mSessions->visitSessions(connector);
         }
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index e5eeb5e..21bca8d 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -25,12 +25,14 @@ using namespace AsteriskSCF::Bridge::V1;
 using namespace AsteriskSCF;
 using namespace std;
 
-SessionWrapper::SessionWrapper(const BridgedSessionPtr& session, const ReplicatorSmartPrx& replicator,
+SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
+        const MediaSplicerPtr& splicer, const ReplicatorSmartPrx& replicator,
         const Logger& logger) :
     mSession(session),
     mReplicator(replicator),
     mLogger(logger),
-    mId(mSession->key)
+    mId(mSession->key),
+    mSplicer(splicer)
 {
 }
 
@@ -62,13 +64,11 @@ void SessionWrapper::ring()
 
 bool SessionWrapper::setConnected()
 {
+    mLogger(Debug) << FUNLOG << " for " << mId;
     BridgedSessionPtr update;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        //
-        // TODO: Going from done or disconnected to connected is probably a bug. Investigate!
-        //
-        if (mSession->currentState != BridgedSessionState::Connected)
+        if (mSession->currentState == BridgedSessionState::Added)
         {
             mSession->currentState = BridgedSessionState::Connected;
             update = createUpdate();
@@ -90,8 +90,9 @@ SessionPrx SessionWrapper::getSession() const
     return mSession->session;
 }
 
-void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
+void SessionWrapper::setupMedia()
 {
+    mLogger(Debug) << FUNLOG << " for " << mId;
     BridgedSessionPtr update;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -99,7 +100,7 @@ void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
         {
             if (!mConnector)
             {
-                mConnector = connector;
+                mConnector = mSplicer->connect(mSession->session);
             }
             mSession->currentState = BridgedSessionState::Connected;
             update = createUpdate();
@@ -108,8 +109,23 @@ void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
     pushUpdate(update);
 }
 
+void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
+{
+    mLogger(Debug) << FUNLOG << " for " << mId;
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    if (mConnector)
+    {
+        mConnector->update(pairings);
+    }
+    else
+    {
+        mConnector = mSplicer->createReplica(pairings);
+    }
+}
+
 void SessionWrapper::disconnect()
 {
+    mLogger(Debug) << FUNLOG << ": disconnecting " << mId;
     BridgedSessionPtr update;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -126,6 +142,7 @@ void SessionWrapper::disconnect()
 
 void SessionWrapper::update(const BridgedSessionPtr& update)
 {
+    mLogger(Debug) << FUNLOG << ": for " << mId;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     if (mSession->serial < update->serial || update->serial < SerialCounterStart)
     {
@@ -140,6 +157,7 @@ void SessionWrapper::update(const BridgedSessionPtr& update)
 
 void SessionWrapper::destroy()
 {
+    mLogger(Debug) << FUNLOG << ": for " << mId;
     BridgedSessionPtr update;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e8e2d66..e11da7c 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -30,6 +30,7 @@ class SessionWrapper : public IceUtil::Shared
 {
 public:
     SessionWrapper(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& session,
+            const MediaSplicerPtr& splicer,
             const AsteriskSCF::BridgeService::ReplicatorSmartPrx& replicator,
             const AsteriskSCF::System::Logging::Logger& logger);
 
@@ -83,7 +84,8 @@ public:
      * with this session.
      * Initiates replication.
      **/
-    void setConnector(const MediaConnectorPtr& connector);
+    void setupMedia();
+    void updateMedia(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
 
     /**
      * Frees connection related resources.
@@ -118,6 +120,7 @@ private:
     MediaConnectorPtr mConnector;
     AsteriskSCF::System::Logging::Logger mLogger;
     std::string mId;
+    MediaSplicerPtr mSplicer;
 
     /**
      * Sends changes to the replication service. This should never occur
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 46cd1f1..2ca5c88 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -574,8 +574,7 @@ public:
                 dumplog(log);
 
                 sessions = bridge->listSessions();
-                BOOST_CHECK(sessions.size() == 2); // XXX
-                BOOST_CHECK(sessions.back() == b); // XXX
+                BOOST_CHECK(sessions.size() == 2); 
                 bridge->shutdown();
 
             }
@@ -653,6 +652,8 @@ public:
                 BridgeSeq bridges = mgrPrx2->listBridges();
                 BridgeSeq bridges2 = mgrPrx->listBridges();
                 BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
+                cout << __LINE__ << " XXX " << bridges2.size() << endl;
+                cout << __LINE__ << " XXX should be " << bridges.size() << endl;
                 mgrPrx->addListener(listenerPrx);
                 bridge = mgrPrx->createBridge(sessions, 0);
                 servant->wait(5000);
diff --git a/test/UnitTests.cpp b/test/UnitTests.cpp
index c57517f..d14012a 100644
--- a/test/UnitTests.cpp
+++ b/test/UnitTests.cpp
@@ -144,7 +144,7 @@ private:
 BOOST_FIXTURE_TEST_CASE(testIfState, Fixture)
 {
     BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
-    SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+    SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
     IfStateCriteria a(BridgedSessionState::Added);
     IfStateCriteria b(BridgedSessionState::Connected);
     BOOST_CHECK(a(wrapper));
@@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE(testIfState, Fixture)
 BOOST_FIXTURE_TEST_CASE(testIfInCrriteria, Fixture)
 {
     BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
-    SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+    SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
     set<BridgedSessionState> yesVals;
     yesVals.insert(BridgedSessionState::Added);
     yesVals.insert(BridgedSessionState::Connected);
@@ -170,7 +170,7 @@ BOOST_FIXTURE_TEST_CASE(testIfInCrriteria, Fixture)
 BOOST_FIXTURE_TEST_CASE(testNegation, Fixture)
 {
     BridgedSessionPtr letter(createBridgedSession(BridgedSessionState::Added));
-    SessionWrapperPtr wrapper(new SessionWrapper(letter, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
+    SessionWrapperPtr wrapper(new SessionWrapper(letter, 0, AsteriskSCF::BridgeService::ReplicatorSmartPrx(), getLogger()));
     set<BridgedSessionState> yesVals;
     yesVals.insert(BridgedSessionState::Added);
     yesVals.insert(BridgedSessionState::Connected);
@@ -193,15 +193,15 @@ BOOST_FIXTURE_TEST_CASE(testCountIf, Fixture)
     //
     AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
     vector<SessionWrapperPtr> testData;
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected), 0, rep, getLogger()));
 
     //
     // Now a variety of test criteria.
@@ -250,17 +250,17 @@ BOOST_FIXTURE_TEST_CASE(testSelect, Fixture)
     //
     AsteriskSCF::BridgeService::ReplicatorSmartPrx rep;
     vector<SessionWrapperPtr> testData;
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "a"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "a"),
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "a"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "a"), 0,
                     rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "c"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "c"), rep,
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "c"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Added, "b"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Disconnected, "c"), 0, rep,
                     getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), rep, getLogger()));
-    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "b"), rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Done, "a"), 0, rep, getLogger()));
+    testData.push_back(new SessionWrapper(createBridgedSession(BridgedSessionState::Connected, "b"), 0, rep, getLogger()));
 
     //
     // Now a variety of test criteria.

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list