[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridge-replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Mar 3 08:47:42 CST 2011


branch "bridge-replication" has been updated
       via  381d74f1d554b08eaa22db45062fd709b35a4ce5 (commit)
       via  5f3eac5c37e27f7d0aaf3b8ee19eae4410c124d7 (commit)
      from  648a2edcda7992f8e4e7e72eb5c9f8ce4d0f4d20 (commit)

Summary of changes:
 config/test_bridging.conf.in           |   21 ++-
 src/BridgeImpl.cpp                     |  335 ++++++++++++++++++++++----------
 src/BridgeImpl.h                       |    8 +-
 src/BridgeManagerImpl.cpp              |   25 ++-
 src/BridgeManagerImpl.h                |    2 +
 src/BridgeReplicatorIf.ice             |   12 +-
 src/BridgeReplicatorStateListenerI.cpp |   94 +++++++++-
 src/CMakeLists.txt                     |    2 +
 src/DebugUtil.h                        |   23 ---
 src/ListenerManager.h                  |   27 ++-
 src/SessionCollection.cpp              |   46 ++++-
 src/SessionCollection.h                |    2 +
 src/SessionListener.cpp                |   11 +-
 src/SessionWrapper.cpp                 |  129 +++++++++----
 src/SessionWrapper.h                   |   13 +-
 test/BridgeManagerListenerI.cpp        |    1 -
 test/TestBridging.cpp                  |   16 +-
 17 files changed, 554 insertions(+), 213 deletions(-)


- Log -----------------------------------------------------------------
commit 381d74f1d554b08eaa22db45062fd709b35a4ce5
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Mar 3 11:12:36 2011 -0330

    Implemented proper state removal. It wasn't clear where state should actually
    be removed from the replicator until all of the lifecycle relationships between
    the refactored objects were "solidified".

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 60bb791..d647416 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -98,7 +98,8 @@ public:
     void activate(const BridgePrx& proxy);
 
     void updateState(const BridgeStateItemPtr& state);
-    void updateListener(const BridgeListenerStateItemPtr& update);
+    void addListener(const BridgeListenerStateItemPtr& update);
+    void removeListener(const BridgeListenerStateItemPtr& update);
     
     void activate();
     string id();
@@ -162,6 +163,11 @@ private:
     void pushUpdate(const BridgeStateItemPtr& update);
     void pushUpdates(const ReplicatedStateItemSeq& updates);
     BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
+
+    bool replicate()
+    {
+        return (mActivated && mReplicator != 0);
+    }
 };
 typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 
@@ -185,7 +191,6 @@ static void checkSessions(const SessionSeq& sessions)
     }
 }
 
-
 //
 // Compiled in constants.
 //
@@ -427,6 +432,7 @@ void BridgeImpl::destroy(const Ice::Current& current)
         mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
         mListeners->stopped();
         mSessionListener = 0;
+        update = createUpdate();
     }
     pushUpdate(update);
 
@@ -447,8 +453,11 @@ void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Curre
         //
         return;
     }
-    
-    if (mListeners->addListener(listener))
+
+    //
+    // Careful about ordering lest short-circuit become an issue
+    //
+    if (mListeners->addListener(listener) && replicate())
     {
         ReplicatedStateItemSeq seq;
         seq.push_back(createFirstListenerUpdate(listener));
@@ -469,15 +478,21 @@ void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Cu
     }
     if (mListeners->removeListener(listener))
     {
-        BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
-        listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
-        listenerUpdate->serial = SerialCounterStart + 1; // There are really only two updates for a listener object.
-        listenerUpdate->listener = listener;
-        listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Delete;
-        listenerUpdate->bridgeId = mState->key;
-        ReplicatedStateItemSeq seq;
-        seq.push_back(listenerUpdate);
-        mReplicator->setState(seq);
+        string key = mState->key + ".listener." +
+            mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+        if (replicate())
+        {
+            try
+            {
+                Ice::StringSeq keys;
+                keys.push_back(key);
+                mReplicator->removeState(keys);
+            }
+            catch (const std::exception& ex)
+            {
+                mLogger(Error) << "call to remove state item " << key << " failed with exception " << ex.what();
+            }
+        }
     }
 }
 
@@ -584,12 +599,25 @@ void BridgeImpl::destroyImpl()
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         mState->runningState = Destroyed;
         mSessions = 0;
-        
+
         mObjAdapter->remove(mPrx->ice_getIdentity());
     }
     catch (const Ice::Exception&)
     {
     }
+
+    try
+    {
+        if (replicate())
+        {
+            Ice::StringSeq keys;
+            keys.push_back(mState->key);
+            mReplicator->removeState(keys);
+        }
+    }
+    catch (const Ice::Exception&)
+    {
+    }
 }
 
 void BridgeImpl::shutdownImpl(const Ice::Current& current)
@@ -613,7 +641,6 @@ void BridgeImpl::activate(const BridgePrx& proxy)
 
     ReplicatedStateItemSeq initialUpdates;
     BridgeStateItemPtr update = createUpdate();
-    update->lifeCycle = ReplicatedItemLifeCycle::Created;
     initialUpdates.push_back(update);
 
     vector<BridgeListenerPrx> listeners = mListeners->getListeners();
@@ -628,27 +655,20 @@ void BridgeImpl::activate(const BridgePrx& proxy)
 void BridgeImpl::updateState(const BridgeStateItemPtr& state)
 {
     mLogger(Debug) << FUNLOG;
-    if (state->lifeCycle == ReplicatedItemLifeCycle::Delete)
-    {
-        destroyImpl();
-        return;
-    }
     boost::unique_lock<boost::shared_mutex> lock(mLock);
     *mState.get() = *state.get();
 }
 
-void BridgeImpl::updateListener(const BridgeListenerStateItemPtr& update)
+void BridgeImpl::addListener(const BridgeListenerStateItemPtr& update)
 {
-    if (update->lifeCycle == ReplicatedItemLifeCycle::Delete)
-    {
-        mListeners->removeListener(update->listener);
-    }
-    else
-    {
-        mListeners->addListener(update->listener);
-    }
+    mListeners->addListener(update->listener);
 }
 
+void BridgeImpl::removeListener(const BridgeListenerStateItemPtr& update)
+{
+    mListeners->removeListener(update->listener);
+}
+    
 void BridgeImpl::activate()
 {
     mLogger(Debug) << FUNLOG;
@@ -727,17 +747,23 @@ void BridgeImpl::statePreCheck()
 BridgeStateItemPtr BridgeImpl::createUpdate()
 {
     ++mState->serial;
-    BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
-    if (mState->runningState == Destroyed)
+    
+    //
+    // There is no point in going through the cost of the copy if it is not going to be replicated. There is a slight
+    // race condition with pushUpdate(s) and the replicator "appearing", in which case the update wil just go out the
+    // next time the state changes.
+    //
+    if (replicate())
     {
-        mState->lifeCycle = ReplicatedItemLifeCycle::Delete;
+        BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+        return result;
     }
-    return result;
+    return 0;
 }
 
 void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
 {
-    if (mActivated && update)
+    if (update && replicate())
     {
         ReplicatedStateItemSeq seq;
         seq.push_back(update);
@@ -747,7 +773,10 @@ void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
 
 void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
 {
-    mReplicator->setState(update);
+    if (update.size() > 0 && replicate())
+    {
+        mReplicator->setState(update);
+    }
 }
 
 BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeListenerPrx& listener)
@@ -756,7 +785,6 @@ BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeLis
     listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
     listenerUpdate->serial = SerialCounterStart;
     listenerUpdate->listener = listener;
-    listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Created;
     listenerUpdate->bridgeId = mState->key;
     return listenerUpdate;
 }
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index a6e3523..a87acd6 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -104,7 +104,8 @@ public:
      **/
     virtual void updateState(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state) = 0;
 
-    virtual void updateListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+    virtual void addListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+    virtual void removeListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
 
     virtual std::string id() = 0;
 
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 91a1eaa..a78c3d9 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -94,6 +94,8 @@ public:
 
     void createBridgeReplica(const BridgeStateItemPtr& bridgeState);
 
+    void removeBridge(const BridgeStateItemPtr& bridgeState);
+
 private:
 
     boost::shared_mutex mLock;
@@ -388,6 +390,22 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
     mBridges.push_back(info);
 }
 
+void BridgeManagerImpl::removeBridge(const BridgeStateItemPtr& state)
+{
+    mLogger(Debug) << FUNLOG;
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
+    {
+        if (i->servant->id() == state->bridgeId)
+        {
+            i->servant->destroyImpl();
+            mBridges.erase(i);
+            break;
+        }
+    }
+    mLogger(Debug) << FUNLOG << ": reaping completed, bridge set size is now " << mBridges.size() << "." ;
+}
+
 void BridgeManagerImpl::reap()
 {
     mLogger(Debug) << FUNLOG << ": reaping bridge set of " << mBridges.size() << " bridges." ;
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index f466000..d168f4e 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -53,6 +53,8 @@ public:
     virtual std::string id() = 0;
 
     virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
+
+    virtual void removeBridge(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
 };
 
 typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 60befec..e31c153 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -42,13 +42,6 @@ const string StateReplicatorDiscoveryCategory = "BridgeReplicator";
  **/
 const int SerialCounterStart = 100;
 
-enum ReplicatedItemLifeCycle
-{
-    Created,
-    Update,
-    Delete
-};
-
 /**
  *
  * Base class for replicated state.
@@ -62,7 +55,6 @@ class ReplicatedStateItem
 {
     string key;                 /* unique identifier for this state item */
     long serial;                /* a version identifier */
-    ReplicatedItemLifeCycle lifeCycle = Update;    
 };
 sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
 
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index bc70ab5..c5060d0 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -41,11 +41,67 @@ public:
     {
     }
 
-    void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+    void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
     {
         for (Ice::StringSeq::const_iterator k = itemKeys.begin(); k != itemKeys.end(); ++k)
         {
-            mItems.erase((*k));
+            map<string, ReplicatedStateItemPtr>::iterator entry =  mItems.find((*k));
+            if (entry != mItems.end())
+            {
+                ReplicatedStateItemPtr item = entry->second;
+                mItems.erase(entry);
+                BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast(item);
+                if (bridgedSessionItem)
+                {
+                    vector<BridgeServantPtr> bridges = mManager->getBridges();
+                    bool found = false;
+                    for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+                    {
+                        if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
+                        {
+                            SessionCollectionPtr sessions = (*b)->sessions();
+                            sessions->removeSession(bridgedSessionItem);
+                            //
+                            // Keep the session list clean.
+                            //
+                            found = true;
+                        }
+                        //
+                        // We could break here if we could be sure that there were no other updates.
+                        //
+                    }
+                    continue;
+                }
+                
+                BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast(item);
+                if (bridgeListener)
+                {
+                    vector<BridgeServantPtr> bridges = mManager->getBridges();
+                    for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+                    {
+                        if ((*b) && (*b)->id() == bridgeListener->bridgeId)
+                        {
+                            (*b)->removeListener(bridgeListener);
+                        }
+                        //
+                        // We could break here if we could be sure that there were no other updates.
+                        //
+                    }
+
+                    continue;
+                }
+                BridgeStateItemPtr bridgeItem = BridgeStateItemPtr::dynamicCast(item);
+                if (bridgeItem)
+                {
+                    dumpState(cerr, bridgeItem, current.adapter->getCommunicator());
+                    mManager->removeBridge(bridgeItem);
+                    continue;
+                }
+
+                ///
+                // The bridge manager isn't really removable.
+                //
+            }
         }
     }
 
@@ -111,7 +167,8 @@ public:
                     //
                 }
               
-                if (!found && bridgeItem->lifeCycle != ReplicatedItemLifeCycle::Delete)
+
+                if (!found)
                 {
                     if (existingItem)
                     {
@@ -160,7 +217,7 @@ public:
                 {
                     if ((*b) && (*b)->id() == bridgeListener->bridgeId)
                     {
-                        (*b)->updateListener(bridgeListener);
+                        (*b)->addListener(bridgeListener);
                     }
                     //
                     // We could break here if we could be sure that there were no other updates.
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 519fec9..cfd8ac7 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -139,7 +139,10 @@ void SessionCollection::reap()
     set<BridgedSessionState> states;
     states.insert(BridgedSessionState::Disconnected);
     states.insert(BridgedSessionState::Done);
-    
+
+    //
+    // Pick out all of the sessions that should be reaped and remove them from the map.
+    //
     vector<SessionWrapperPtr> reaped;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -156,36 +159,35 @@ void SessionCollection::reap()
             }
         }
     }
-    for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+
+    if (mReplicator)
     {
-        (*i)->reaped();
+        Ice::StringSeq removedKeys;
+        for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+        {
+            removedKeys.push_back((*i)->id());
+        }
+        mReplicator->removeState(removedKeys);
     }
+    //
+    // Unless the wrapper is being held for some other reason, it should be deleted
+    // after it goes out of this scope.
+    //
 }
 
 void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
 {
     SessionWrapperPtr updater;
-    bool removal = session->lifeCycle == ReplicatedItemLifeCycle::Delete;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         SessionMap::iterator i = mMap.find(session->key);
         if (i == mMap.end())
         {
-            if (!removal)
-            {
-                mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
-            }
+            mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
         }
         else
         {
-            if (!removal)
-            {
-                updater = i->second;
-            }
-            else
-            {
-                mMap.erase(i);
-            }
+            updater = i->second;
         }
     }
     if (updater)
@@ -193,3 +195,13 @@ void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
         updater->update(session);
     }
 }
+
+void SessionCollection::removeSession(const BridgedSessionPtr& session)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    SessionMap::iterator i = mMap.find(session->key);
+    if (i != mMap.end())
+    {
+        mMap.erase(i);
+    }
+}
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index ea3aed0..a93f7c2 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -117,6 +117,8 @@ public:
     void reap();
 
     void replicaUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& bridgedSession);
+
+    void removeSession(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& bridgedSession);
     
 private:
 
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 6d08241..e5eeb5e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -29,7 +29,8 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session, const Replicato
         const Logger& logger) :
     mSession(session),
     mReplicator(replicator),
-    mLogger(logger)
+    mLogger(logger),
+    mId(mSession->key)
 {
 }
 
@@ -158,20 +159,14 @@ bool SessionWrapper::isDestroyed()
     return mSession->currentState == BridgedSessionState::Done;
 }
 
-void SessionWrapper::reaped()
+string SessionWrapper::id()
 {
-    BridgedSessionPtr update;
-    {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        mSession->lifeCycle = ReplicatedItemLifeCycle::Delete;
-        update = createUpdate();
-    }
-    pushUpdate(update);
+    return mId;
 }
 
 void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
 {
-    if (update)
+    if (update && mReplicator)
     {
         ReplicatedStateItemSeq seq;
         seq.push_back(update);
@@ -197,8 +192,13 @@ void SessionWrapper::unplugMedia()
 BridgedSessionPtr SessionWrapper::createUpdate()
 {
     //
-    // While there is a little overhead copying everytime there is an update, this decouples the locking from the replication process.
+    // While there is a little overhead copying everytime there is an update, this decouples the locking from the
+    // replication process.
     //
     ++mSession->serial;
-    return new BridgedSession(*mSession.get());
+    if (mReplicator)
+    {
+        return new BridgedSession(*mSession.get());
+    }
+    return 0;
 }
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 6d5b5a9..e8e2d66 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -108,14 +108,8 @@ public:
      **/
     bool isDestroyed();
 
-    /**
-     *
-     * Just a final stage in this object's lifecycle. This will cause and update to be replicated out to notify that
-     * this item is no longer needed.
-     * 
-     **/
-    void reaped();
-
+    std::string id();
+    
 private:
 
     mutable boost::shared_mutex mLock;
@@ -123,6 +117,7 @@ private:
     AsteriskSCF::BridgeService::ReplicatorSmartPrx mReplicator;
     MediaConnectorPtr mConnector;
     AsteriskSCF::System::Logging::Logger mLogger;
+    std::string mId;
 
     /**
      * Sends changes to the replication service. This should never occur

commit 5f3eac5c37e27f7d0aaf3b8ee19eae4410c124d7
Author: Brent Eagles <beagles at digium.com>
Date:   Wed Mar 2 18:09:21 2011 -0330

    - Fixed several replication and related lifecycle issues.
    - Fixed a bug introduced in the default bridge listener mechanism when refactoring for replication.

diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 22c0c4a..8b16e1b 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -19,19 +19,32 @@ TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
 
 IceBox.InheritProperties=1
 IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
-IceBox.Service.Replicator=../src at BridgeReplicator::create
+IceBox.Service.Replicator=../src at BridgeReplicator:create
 IceBox.Service.TestBridge=../src at bridgeservice:create
 IceBox.Service.TestBridge2=../src/@bridgeservice:create
 IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
 IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridging_unit_test:create
+IceBox.Service.TestDriver=../test at bridge_component_test:create
 
 TestBridge.InstanceName=TestBridge
 TestBridge.BridgeService.Endpoints=default -p 55561
+TestBridge.BridgeService.ThreadPool.Size=4
 TestBridge.Proxy=BridgeManager:default -p 55561
+TestBridge.BridgeServiceInternal.Endpoints=default -p 65461
+TestBridge.BridgeServiceInternal.ThreadPool.Size=4
+
 TestBridge2.InstanceName=TestBridge2
-TestBridge2.BridgeService.Endpoints=default -p 55562
-TestBridge2.Proxy=BridgeManager:default -p 55562
+TestBridge2.BridgeService.Endpoints=default -p 55572
+TestBridge2.BridgeService.ThreadPool.Size=4
+TestBridge2.Proxy=BridgeManager:default -p 55572
+TestBridge2.BridgeServiceInternal.Endpoints=default -p 65462
+TestBridge2.BridgeServiceInternal.ThreadPool.Size=4
+
+Replicator.InstanceName=Replicator
+Replicator.BridgeReplicator.Endpoints=default -p 63242
+Replciator.Proxy=BridgeReplicator:default -p 63242
+Replicator.BridgeReplicator.ThreadPool.Size=4
+
 TestChannel.Proxy=TestChannel:default -p 55560
 TestUtilAdapter.Endpoints=default -p 55562
 Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 61df06c..60bb791 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -53,7 +53,7 @@ using namespace std;
  *
  */
 
-namespace 
+namespace
 {
 
 /**
@@ -97,9 +97,9 @@ public:
     void shutdownImpl(const Ice::Current& current);
     void activate(const BridgePrx& proxy);
 
-    BridgeStateItemPtr getState();
     void updateState(const BridgeStateItemPtr& state);
-    void updateState(const BridgedSessionPtr& sessionState);
+    void updateListener(const BridgeListenerStateItemPtr& update);
+    
     void activate();
     string id();
     SessionCollectionPtr sessions();
@@ -158,8 +158,10 @@ private:
     IceUtil::Handle<IceUtil::Thread> mShutdownThread;
 
     void statePreCheck();
-    void update();
-    void updateNoLock();
+    BridgeStateItemPtr createUpdate();
+    void pushUpdate(const BridgeStateItemPtr& update);
+    void pushUpdates(const ReplicatedStateItemSeq& updates);
+    BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
 };
 typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
 
@@ -183,6 +185,7 @@ static void checkSessions(const SessionSeq& sessions)
     }
 }
 
+
 //
 // Compiled in constants.
 //
@@ -206,6 +209,7 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
     mSessionListener(createSessionListener(mSessions, logger)),
     mLogger(logger)
 {
+    mLogger(Debug) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
     for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin(); 
       i != listeners.end(); ++i)
     {
@@ -288,16 +292,20 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
         {
         }
     }
-                
+
+    BridgeStateItemPtr update;
     if (addedSessions.size())
     {
         mListeners->sessionsAdded(addedSessions);
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            update = createUpdate();
+        }
     }
-    update();
+    pushUpdate(update);
 }
 
-void BridgeImpl::removeSessions(
-  const SessionSeq& sessions, const Ice::Current& current)
+void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current& current)
 {
     if (sessions.size() == 0)
     {
@@ -306,8 +314,7 @@ void BridgeImpl::removeSessions(
     checkSessions(sessions);
     SessionSeq removedSessions;
     statePreCheck();
-    for (SessionSeq::const_iterator i = sessions.begin(); 
-         i != sessions.end(); ++i)
+    for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
     {
         SessionWrapperPtr session = mSessions->getSession(*i);
         if (session)
@@ -334,102 +341,144 @@ void BridgeImpl::removeSessions(
         mListeners->sessionsRemoved(removedSessions);
     }
 
-    CountIfOperation< Negate <IfStateCriteria> > counter(IfStateCriteria(BridgedSessionState::Done));
-    mSessions->visitSessions(counter);
-    if (counter.count() < 2)
+    SessionSeq sessionsLeft(mSessions->getSessionSeq());
+    if (sessionsLeft.size() < 2)
     {
         spawnShutdown();
     }
-    update();
+    mSessions->reap();
 }
 
 SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
 {
     mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
     statePreCheck();
-    set< BridgedSessionState > excludedStates;
-    excludedStates.insert(BridgedSessionState::Done);
-    excludedStates.insert(BridgedSessionState::Disconnected);
-    Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > > whereNot(excludedStates);
-    SelectOperation< Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > >, SessionPrxSelector, SessionSeq> select(whereNot);
-    mSessions->visitSessions(select);
-    return select.results();
+    return mSessions->getSessionSeq();
 }
 
 void BridgeImpl::shutdown(const Ice::Current& current)
 {
     //
+    // In an effort to allow some consistency with replicas, the shutdown operation is broken into
+    // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
+    //
+    
+    //
     // When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
     // no other internal locks.
     //
     mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if (mState->runningState == ShuttingDown)
+
+    BridgeStateItemPtr update;
     {
-        mLogger(Debug) << FUNLOG << ": called when shutting down." ;
-        return;
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mState->runningState == ShuttingDown)
+        {
+            mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+            return;
+        }
+        if (mState->runningState == Destroyed)
+        {
+            mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+            throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+        }
+        mState->runningState = ShuttingDown;
+
+        mListeners->stopping();
+        update = createUpdate();
     }
-    if (mState->runningState == Destroyed)
+    pushUpdate(update);
+    update = 0;
     {
-        mLogger(Debug) << FUNLOG << ": called when destroyed." ;
-        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+        ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+        mSessions->visitSessions(shutdownOp);
+        mListeners->stopped();
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
+        mState->runningState = Destroyed;
+        //
+        // Remove references to the session listener implementation.
+        //
+        update = createUpdate();
+        mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+        mSessionListener = 0;
     }
-    mState->runningState = ShuttingDown;
-
-    mListeners->stopping();
-
-    ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
-    mSessions->visitSessions(shutdownOp);
-
-    mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
-    mListeners->stopped();
-    mState->runningState = Destroyed;
-
-    //
-    // Remove references to the session listener implementation.
-    //
-    mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
-    mSessionListener = 0;
-
-    updateNoLock();
+    mSessions = 0;
+    pushUpdate(update);
 }
 
 void BridgeImpl::destroy(const Ice::Current& current)
 {
     mLogger(Debug) << FUNLOG << ":" <<  objectIdFromCurrent(current);
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if (mState->runningState == ShuttingDown)
+    BridgeStateItemPtr update;
     {
-        mLogger(Debug) << FUNLOG << ": called when shutting down." ;
-        throw AsteriskSCF::System::Component::V1::ShuttingDown();
-    }
-    if (mState->runningState == Destroyed)
-    {
-        mLogger(Debug) << FUNLOG << ": called when destroyed." ;
-        throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mState->runningState == ShuttingDown)
+        {
+            mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+            throw AsteriskSCF::System::Component::V1::ShuttingDown();
+        }
+        if (mState->runningState == Destroyed)
+        {
+            mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+            throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+        }
+        mState->runningState = Destroyed;
+        mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
+        mListeners->stopped();
+        mSessionListener = 0;
     }
-    mState->runningState = Destroyed;
-    mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
-    mListeners->stopped();
-    mSessionListener = 0;
+    pushUpdate(update);
 
     //
     // Remove references to the session listener implementation.
     //
     mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
-    updateNoLock();
 }
 
-void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current&)
+void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current& current)
 {
-    mListeners->addListener(listener);
-    update();
+    mLogger(Debug) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
+        (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+    if (!listener)
+    {
+        //
+        // TODO should probably throw an exception here.
+        //
+        return;
+    }
+    
+    if (mListeners->addListener(listener))
+    {
+        ReplicatedStateItemSeq seq;
+        seq.push_back(createFirstListenerUpdate(listener));
+        mReplicator->setState(seq);
+    }
 }
 
-void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current&)
+void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current& current)
 {
-    mListeners->removeListener(listener);
-    update();
+    mLogger(Debug) << FUNLOG << ":" <<  objectIdFromCurrent(current) << " with bridge listener: " <<
+        (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+    if (!listener)
+    {
+        //
+        // TODO should probably throw an exception here.
+        //
+        return;
+    }
+    if (mListeners->removeListener(listener))
+    {
+        BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
+        listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+        listenerUpdate->serial = SerialCounterStart + 1; // There are really only two updates for a listener object.
+        listenerUpdate->listener = listener;
+        listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Delete;
+        listenerUpdate->bridgeId = mState->key;
+        ReplicatedStateItemSeq seq;
+        seq.push_back(listenerUpdate);
+        mReplicator->setState(seq);
+    }
 }
 
 void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
@@ -464,7 +513,6 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
     SessionSeq removed;
     removed.push_back(oldSession);
     mListeners->sessionsRemoved(removed);
-    update();
 
     SessionSeq added;
     for (SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
@@ -520,21 +568,23 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
     // Now update the listeners.
     //
     mListeners->sessionsAdded(added);
-    update();
 }
 
 bool BridgeImpl::destroyed()
 {
     boost::shared_lock<boost::shared_mutex> lock(mLock);
     mLogger(Debug) << FUNLOG << ": " << (mState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
-    return mState == Destroyed;
+    return mState->runningState == Destroyed;
 }
 
 void BridgeImpl::destroyImpl()
 {
-    boost::shared_lock<boost::shared_mutex> lock(mLock);
     try
     {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mState->runningState = Destroyed;
+        mSessions = 0;
+        
         mObjAdapter->remove(mPrx->ice_getIdentity());
     }
     catch (const Ice::Exception&)
@@ -560,22 +610,57 @@ void BridgeImpl::activate(const BridgePrx& proxy)
         SessionListenerPrx::uncheckedCast(
             mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
             );
-    update();
+
+    ReplicatedStateItemSeq initialUpdates;
+    BridgeStateItemPtr update = createUpdate();
+    update->lifeCycle = ReplicatedItemLifeCycle::Created;
+    initialUpdates.push_back(update);
+
+    vector<BridgeListenerPrx> listeners = mListeners->getListeners();
+    for (vector<BridgeListenerPrx>::iterator i = listeners.begin(); i != listeners.end(); ++i)
+    {
+        initialUpdates.push_back(createFirstListenerUpdate(*i));
+    }
+    
+    pushUpdates(initialUpdates);
 }
 
-BridgeStateItemPtr BridgeImpl::getState()
+void BridgeImpl::updateState(const BridgeStateItemPtr& state)
 {
     mLogger(Debug) << FUNLOG;
-    BridgeStateItemPtr result(new BridgeStateItem);
-    *result = *mState;
-    return result;
+    if (state->lifeCycle == ReplicatedItemLifeCycle::Delete)
+    {
+        destroyImpl();
+        return;
+    }
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    *mState.get() = *state.get();
 }
 
-void BridgeImpl::updateState(const BridgeStateItemPtr& state)
+void BridgeImpl::updateListener(const BridgeListenerStateItemPtr& update)
+{
+    if (update->lifeCycle == ReplicatedItemLifeCycle::Delete)
+    {
+        mListeners->removeListener(update->listener);
+    }
+    else
+    {
+        mListeners->addListener(update->listener);
+    }
+}
+
+void BridgeImpl::activate()
 {
     mLogger(Debug) << FUNLOG;
     boost::unique_lock<boost::shared_mutex> lock(mLock);
-    *mState.get() = *state.get();
+    mActivated = true;
+    mPrx = BridgePrx::uncheckedCast(mObjAdapter->add(this, mObjAdapter->getCommunicator()->stringToIdentity(mState->key)));
+    string listenerId = mState->key + ".sessionListener";
+    mSessionListenerPrx = SessionListenerPrx::uncheckedCast(
+        mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId)));
+    //
+    // Now this replica should be ready to go!
+    //
 }
 
 string BridgeImpl::id()
@@ -588,13 +673,6 @@ SessionCollectionPtr BridgeImpl::sessions()
     return mSessions;
 }
 
-void BridgeImpl::activate()
-{
-    mLogger(Debug) << FUNLOG;
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    mActivated = true;
-}
-
 namespace
 {
 class ShutdownThread : public IceUtil::Thread
@@ -630,39 +708,59 @@ void BridgeImpl::spawnShutdown()
     }
 }
 
-void BridgeImpl::update()
-{
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    updateNoLock();
-}
-
-void BridgeImpl::updateNoLock()
-{
-    if (mActivated)
-    {
-        ++mState->serial;
-        ReplicatedStateItemSeq seq;
-        seq.push_back(getState());
-        mReplicator->setState(seq);
-    }
-}
-
 void BridgeImpl::statePreCheck()
 {
     boost::shared_lock<boost::shared_mutex> lock(mLock);
 
-    if (mState == ShuttingDown)
+    if (mState->runningState == ShuttingDown)
     {
         mLogger(Debug) << FUNLOG << ": called when shutting down." ;
         throw AsteriskSCF::System::Component::V1::ShuttingDown();
     }
-    if (mState == Destroyed)
+    if (mState->runningState == Destroyed)
     {
         mLogger(Debug) << FUNLOG << ": called when destroyed." ;
         throw Ice::ObjectNotExistException(__FILE__, __LINE__);
     }
 }
 
+BridgeStateItemPtr BridgeImpl::createUpdate()
+{
+    ++mState->serial;
+    BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+    if (mState->runningState == Destroyed)
+    {
+        mState->lifeCycle = ReplicatedItemLifeCycle::Delete;
+    }
+    return result;
+}
+
+void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
+{
+    if (mActivated && update)
+    {
+        ReplicatedStateItemSeq seq;
+        seq.push_back(update);
+        pushUpdates(seq);
+    }
+}
+
+void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
+{
+    mReplicator->setState(update);
+}
+
+BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeListenerPrx& listener)
+{
+    BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
+    listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+    listenerUpdate->serial = SerialCounterStart;
+    listenerUpdate->listener = listener;
+    listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Created;
+    listenerUpdate->bridgeId = mState->key;
+    return listenerUpdate;
+}
+
 IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
 AsteriskSCF::BridgeService::BridgeServant::create(const string& name, const Ice::ObjectAdapterPtr& objectAdapter,
         const vector<BridgeListenerPrx>& listeners,
@@ -690,5 +788,8 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
         const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
 {
     logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
-    return new BridgeImpl(state->bridgeId, objectAdapter, state->listeners, listenerMgr, replicator, state, logger);
+    IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
+                    listenerMgr, replicator, state, logger));
+    
+    return bridge;
 }
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 3e3c865..a6e3523 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -72,7 +72,7 @@ public:
      * to listeners and bridged sessions to notify them that the bridge is
      * going away will not be made! Use shutdownImpl() for that kind of thing.
      *
-     **/
+      **/
     virtual void destroyImpl() = 0;
 
     /**
@@ -102,11 +102,10 @@ public:
      * TODO: complete documentation.
      *
      **/
-    
-    virtual AsteriskSCF::Bridge::V1::BridgeStateItemPtr getState() = 0;
-
     virtual void updateState(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state) = 0;
 
+    virtual void updateListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+
     virtual std::string id() = 0;
 
     virtual SessionCollectionPtr sessions() = 0;
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 3f6b4a7..91a1eaa 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -105,7 +105,6 @@ private:
     BridgeManagerPrx mSourceProxy;
     BridgeManagerListenerMgrPtr mListeners;
     Logger mLogger;
-    vector<BridgeListenerPrx> mDefaultBridgeListeners;
 
     BridgeManagerStateItemPtr mState;
 
@@ -180,7 +179,7 @@ BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
     Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
     BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
     BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
-    vector<BridgeListenerPrx> listeners(mDefaultBridgeListeners);
+    vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
     if (listener)
     {
         listeners.push_back(listener);
@@ -256,6 +255,7 @@ void BridgeManagerImpl::addDefaultBridgeListener(const BridgeListenerPrx& newLis
     if (mState->defaultBridgeListeners.end() == find_if(mState->defaultBridgeListeners.begin(), mState->defaultBridgeListeners.end(), 
           IdentityComparePredicate<BridgeListenerPrx>(newListener)))
     {
+        mLogger(Debug) << FUNLOG << ": adding new listener "  << current.adapter->getCommunicator()->identityToString(newListener->ice_getIdentity());
         mState->defaultBridgeListeners.push_back(newListener);
     }
     update();
@@ -280,7 +280,8 @@ void BridgeManagerImpl::removeDefaultBridgeListener(const BridgeListenerPrx& toR
 BridgeSeq BridgeManagerImpl::listBridges(const Ice::Current& current)
 {
     mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current); 
-    boost::shared_lock<boost::shared_mutex> lock(mLock);
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    reap();
     BridgeSeq result;
     for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end();++i)
     {
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 0fa5dfe..60befec 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -42,6 +42,13 @@ const string StateReplicatorDiscoveryCategory = "BridgeReplicator";
  **/
 const int SerialCounterStart = 100;
 
+enum ReplicatedItemLifeCycle
+{
+    Created,
+    Update,
+    Delete
+};
+
 /**
  *
  * Base class for replicated state.
@@ -55,6 +62,7 @@ class ReplicatedStateItem
 {
     string key;                 /* unique identifier for this state item */
     long serial;                /* a version identifier */
+    ReplicatedItemLifeCycle lifeCycle = Update;    
 };
 sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
 
@@ -200,15 +208,15 @@ class BridgeStateItem extends ReplicatedStateItem
      **/
     ServiceState runningState;
 
-    /**
-     * These items are replicated separately, but mirrored here.
-     * TODO: see if they can be completely removed.
-     **/
-    BridgedSessionSeq bridgedSessions;
-    BridgeListenerSeq listeners;
     MediaOperationReplicationPolicy mediaReplicationPolicy;
 };
 
+class BridgeListenerStateItem extends ReplicatedStateItem
+{
+    string bridgeId;
+    AsteriskSCF::SessionCommunications::V1::BridgeListener* listener;
+};
+
 /**
  *
  * The BridgeReplicatorListener interface must be implemented by components
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 7cef728..bc70ab5 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -57,6 +57,10 @@ public:
             ReplicatedStateItemPtr existingItem;
             if (entry != mItems.end())
             {
+                //
+                // Note: Another serial number situation to consider is two updates of the same key with
+                // the same serial number.
+                //
 
                 //
                 // Look at serial numbers and indicate an out of sequence update. We should
@@ -107,11 +111,12 @@ public:
                     //
                 }
               
-                if (!found)
+                if (!found && bridgeItem->lifeCycle != ReplicatedItemLifeCycle::Delete)
                 {
                     if (existingItem)
                     {
-                        mLogger(Error) << "Replica listener has a bridge object that the bridge manager does not know about. This likely indicates an error and should be investigated.";
+                        mLogger(Error) << "Replica listener has a bridge object that the bridge manager "
+                            "does not know about. This likely indicates an error and should be investigated.";
                     }
                     mManager->createBridgeReplica(bridgeItem);
                 }
@@ -129,6 +134,9 @@ public:
                     {
                         SessionCollectionPtr sessions = (*b)->sessions();
                         sessions->replicaUpdate(bridgedSessionItem);
+                        //
+                        // Keep the session list clean.
+                        //
                         found = true;
                     }
                     //
@@ -143,6 +151,29 @@ public:
                 continue;
             }
 
+            BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast((*i));
+            if (bridgeListener)
+            {
+                vector<BridgeServantPtr> bridges = mManager->getBridges();
+                bool found = false;
+                for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+                {
+                    if ((*b) && (*b)->id() == bridgeListener->bridgeId)
+                    {
+                        (*b)->updateListener(bridgeListener);
+                    }
+                    //
+                    // We could break here if we could be sure that there were no other updates.
+                    //
+                }
+                if (!found)
+                {
+                    mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+                }
+             
+                continue;
+            }
+
             mLogger(Info) << "Bridge replicator service received an unrecognized replication item.";
         }
     }
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4685e05..f9b39d1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -11,6 +11,8 @@ asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.cpp)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.h)
 asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionListener.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionListener.h)
 asterisk_scf_component_add_file(bridgeservice SessionWrapper.cpp)
 asterisk_scf_component_add_file(bridgeservice SessionWrapper.h)
 asterisk_scf_component_add_file(bridgeservice SessionCollection.cpp)
diff --git a/src/DebugUtil.h b/src/DebugUtil.h
index 7c9d565..b691050 100644
--- a/src/DebugUtil.h
+++ b/src/DebugUtil.h
@@ -109,29 +109,6 @@ std::ostream& dumpState(std::ostream& os, const AsteriskSCF::Bridge::V1::BridgeS
         default:
             os << "(invalid)\n";
     }
-    int index = 0;
-    for (AsteriskSCF::Bridge::V1::BridgedSessionSeq::const_iterator i = state->bridgedSessions.begin();
-         i != state->bridgedSessions.end(); ++i)
-    {
-        if (index == 0)
-        {
-            os << "Bridged sessions (" << state->bridgedSessions.size() << "):\n";
-            index = 1;
-        }
-        dumpState(os, "\t", *i, comm);
-    }
-    index = 0;
-    for (AsteriskSCF::Bridge::V1::BridgeListenerSeq::const_iterator i = state->listeners.begin();
-         i != state->listeners.end(); ++i)
-    {
-        if (index == 0)
-        {
-            os << "Bridge listeners (" << state->listeners.size() << "):\n";
-        }
-        os << index << ". " << comm->identityToString((*i)->ice_getIdentity()) << '\n';
-        ++index;
-    }
-    index = 0;
     os << "Media replication policy: ";
     switch (state->mediaReplicationPolicy)
     {
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
index c6ef841..c8d6843 100644
--- a/src/ListenerManager.h
+++ b/src/ListenerManager.h
@@ -112,15 +112,19 @@ public:
     // NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
     // and whatnot are not flagged.
     //
-    void addListener(const T& listener)
+    bool addListener(const T& listener)
     {
-        boost::unique_lock<boost::shared_mutex> lock(mLock);
-        if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+        bool added = false;
         {
-            mListeners.push_back(listener);
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+            {
+                mListeners.push_back(listener);
+                added = true;
+            }
         }
 
-        if(mInitialized)
+        if(mInitialized && added)
         {
             IceStorm::QoS qos;
             qos["reliability"] = "ordered";
@@ -129,16 +133,18 @@ public:
             {
                 mTopic->subscribeAndGetPublisher(qos, listener);
             }
-            catch(const IceStorm::AlreadySubscribed&)
+            catch (const IceStorm::AlreadySubscribed&)
             {
                 //
-                // This indicates some kind of inconsistent state.
+                // This indicates some kind of inconsistent state or could
+                // happen if this is a replica.
                 //
             }
         }
+        return added;
     }
 
-    void removeListener(const T& listener)
+    bool removeListener(const T& listener)
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
@@ -147,9 +153,14 @@ public:
             mListeners.erase(i);
             if(mInitialized)
             {
+                //
+                // unsubscribe doesn't seem to care whether the subscriber was subscribed or not.
+                //
                 mTopic->unsubscribe(listener);
             }
+            return true;
         }
+        return false;
     }
 
     std::vector<T> getListeners()
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 4dac11f..519fec9 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -136,32 +136,56 @@ size_t SessionCollection::size()
 
 void SessionCollection::reap()
 {
-    for (SessionMap::iterator i = mMap.begin(); i != mMap.end();)
+    set<BridgedSessionState> states;
+    states.insert(BridgedSessionState::Disconnected);
+    states.insert(BridgedSessionState::Done);
+    
+    vector<SessionWrapperPtr> reaped;
     {
-        if (i->second->isDestroyed())
-        {
-            mMap.erase(i++);
-        }
-        else
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        for (SessionMap::iterator i = mMap.begin(); i != mMap.end();)
         {
-            ++i;
+            if (states.find(i->second->getBridgedSession()->currentState) != states.end())
+            {
+                reaped.push_back(i->second);
+                mMap.erase(i++);
+            }
+            else
+            {
+                ++i;
+            }
         }
     }
+    for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+    {
+        (*i)->reaped();
+    }
 }
 
 void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
 {
     SessionWrapperPtr updater;
+    bool removal = session->lifeCycle == ReplicatedItemLifeCycle::Delete;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
         SessionMap::iterator i = mMap.find(session->key);
         if (i == mMap.end())
         {
-            mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+            if (!removal)
+            {
+                mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+            }
         }
         else
         {
-            updater = i->second;
+            if (!removal)
+            {
+                updater = i->second;
+            }
+            else
+            {
+                mMap.erase(i);
+            }
         }
     }
     if (updater)
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index ec58c22..3f0312b 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -60,8 +60,9 @@ public:
             //
             // XXX- media pairings.
             //
-            session->connect();
-            mSessions->visitSessions(ConnectSessionOperation(source, mLogger));
+            session->setConnected();
+            ConnectSessionOperation connector(source, mLogger);
+            mSessions->visitSessions(connector);
         }
         catch (const Ice::Exception& ex)
         {
@@ -85,7 +86,8 @@ public:
 
     void ringing(const SessionPrx& source, const Ice::Current&)
     {
-        mSessions->visitSessions(RingSessionOperation(source, mLogger));
+        RingSessionOperation ringer(source, mLogger);
+        mSessions->visitSessions(ringer);
     }
 
     void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
@@ -93,9 +95,10 @@ public:
         string proxyString = source->ice_toString();
         mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
 
+        SessionWrapperPtr session;
         try
         {
-            SessionWrapperPtr session = mSessions->getSession(source);
+            session = mSessions->getSession(source);
             if (!session)
             {
                 mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 67d03ba..6d08241 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -41,35 +41,40 @@ bool SessionWrapper::isConnected()
 
 void SessionWrapper::connect()
 {
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    if (mSession->currentState == BridgedSessionState::Connected)
+    if (setConnected())
     {
-        return;
+        mSession->session->connect();
     }
-
-    //
-    // TODO: AMI!
-    //
-    mSession->session->connect();
-    mSession->currentState = BridgedSessionState::Connected;
-    pushUpdate();
 }
 
 void SessionWrapper::ring()
 {
-    boost::shared_lock<boost::shared_mutex> lock(mLock);
-    if (mSession->currentState == BridgedSessionState::Connected)
     {
-        return;
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        if (mSession->currentState == BridgedSessionState::Connected)
+        {
+            return;
+        }
     }
     mSession->session->ring();
 }
 
-void SessionWrapper::setConnected()
+bool SessionWrapper::setConnected()
 {
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    mSession->currentState = BridgedSessionState::Connected;
-    pushUpdate();
+    BridgedSessionPtr update;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        //
+        // TODO: Going from done or disconnected to connected is probably a bug. Investigate!
+        //
+        if (mSession->currentState != BridgedSessionState::Connected)
+        {
+            mSession->currentState = BridgedSessionState::Connected;
+            update = createUpdate();
+        }
+    }
+    pushUpdate(update);
+    return (update != 0);
 }
 
 BridgedSessionPtr SessionWrapper::getBridgedSession() const
@@ -86,18 +91,36 @@ SessionPrx SessionWrapper::getSession() const
 
 void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
 {
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    mConnector = connector;
-    mSession->currentState = BridgedSessionState::Connected;
-    pushUpdate();
+    BridgedSessionPtr update;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mSession->currentState == BridgedSessionState::Added)
+        {
+            if (!mConnector)
+            {
+                mConnector = connector;
+            }
+            mSession->currentState = BridgedSessionState::Connected;
+            update = createUpdate();
+        }
+    }
+    pushUpdate(update);
 }
 
 void SessionWrapper::disconnect()
 {
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    unplugMedia();
-    mSession->currentState = BridgedSessionState::Disconnected;
-    pushUpdate();
+    BridgedSessionPtr update;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mSession->currentState != BridgedSessionState::Disconnected &&
+                mSession->currentState != BridgedSessionState::Done)
+        {
+            unplugMedia();
+            mSession->currentState = BridgedSessionState::Disconnected;
+            update = createUpdate();
+        }
+    }
+    pushUpdate(update);
 }
 
 void SessionWrapper::update(const BridgedSessionPtr& update)
@@ -114,21 +137,19 @@ void SessionWrapper::update(const BridgedSessionPtr& update)
     }
 }
 
-void SessionWrapper::pushUpdate()
-{
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    ++mSession->serial;
-    ReplicatedStateItemSeq seq;
-    seq.push_back(mSession);
-    mReplicator->setState(seq);
-}
-
 void SessionWrapper::destroy()
 {
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    unplugMedia();
-    mSession->currentState = BridgedSessionState::Done;
-    pushUpdate();
+    BridgedSessionPtr update;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        if (mSession->currentState != BridgedSessionState::Done)
+        {
+            unplugMedia();
+            mSession->currentState = BridgedSessionState::Done;
+            update = createUpdate();
+        }
+    }
+    pushUpdate(update);
 }
 
 bool SessionWrapper::isDestroyed()
@@ -137,6 +158,27 @@ bool SessionWrapper::isDestroyed()
     return mSession->currentState == BridgedSessionState::Done;
 }
 
+void SessionWrapper::reaped()
+{
+    BridgedSessionPtr update;
+    {
+        boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mSession->lifeCycle = ReplicatedItemLifeCycle::Delete;
+        update = createUpdate();
+    }
+    pushUpdate(update);
+}
+
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+{
+    if (update)
+    {
+        ReplicatedStateItemSeq seq;
+        seq.push_back(update);
+        mReplicator->setState(seq);
+    }
+}
+
 void SessionWrapper::unplugMedia()
 {
     if (mConnector)
@@ -151,3 +193,12 @@ void SessionWrapper::unplugMedia()
         }
     }
  }
+
+BridgedSessionPtr SessionWrapper::createUpdate()
+{
+    //
+    // While there is a little overhead copying everytime there is an update, this decouples the locking from the replication process.
+    //
+    ++mSession->serial;
+    return new BridgedSession(*mSession.get());
+}
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e8d49c6..6d5b5a9 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -57,10 +57,10 @@ public:
     void ring();
 
     /**
-     * Simply sets connected state without initiating additional operations.
-     * Initiates replication.
+     * Simply sets connected state without initiating additional operations.  Returns true if the state was
+     * changed. Initiates replication if the state was changed.
      **/
-    void setConnected();
+    bool setConnected();
 
     /**
      * Accesses the wrapped bridged session. This is a bit more expensive
@@ -108,6 +108,14 @@ public:
      **/
     bool isDestroyed();
 
+    /**
+     *
+     * Just a final stage in this object's lifecycle. This will cause and update to be replicated out to notify that
+     * this item is no longer needed.
+     * 
+     **/
+    void reaped();
+
 private:
 
     mutable boost::shared_mutex mLock;
@@ -120,12 +128,14 @@ private:
      * Sends changes to the replication service. This should never occur
      * unless the host service is active.
      **/
-    void pushUpdate();
+    void pushUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& update);
 
     /**
      * Disconnection helper.
      **/
     void unplugMedia();
+
+    AsteriskSCF::Bridge::V1::BridgedSessionPtr createUpdate();
 };
 
 typedef IceUtil::Handle<SessionWrapper> SessionWrapperPtr;
diff --git a/test/BridgeManagerListenerI.cpp b/test/BridgeManagerListenerI.cpp
index f180353..2abd677 100644
--- a/test/BridgeManagerListenerI.cpp
+++ b/test/BridgeManagerListenerI.cpp
@@ -27,7 +27,6 @@ BridgeManagerListenerI::BridgeManagerListenerI() :
 void BridgeManagerListenerI::bridgeCreated(const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& manager,
     const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge, const Ice::Current&)
 {
-    std::cerr << "XXX: bridgeCreated" << std::endl;
     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
     ++mCreated;
     mMonitor.notify();
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index e1ba7b2..46cd1f1 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -440,8 +440,8 @@ public:
                 //
                 channel.commands()->answer(idA);
                 channel.commands()->answer(idB);
-                mgrPrx->listBridges();
-
+                BridgeSeq bridges = mgrPrx->listBridges();
+                BOOST_CHECK(bridges.size() == 1);
                 BOOST_CHECK(bridgeListener->addedCount() == 2);
                 bridge->shutdown();
 
@@ -450,6 +450,8 @@ public:
                 BOOST_CHECK(find(log, "stop"));
                 channel.commands()->getlog(idB, log);
                 BOOST_CHECK(find(log, "stop"));
+                mgrPrx->removeDefaultBridgeListener(bridgeListenerPrx);
+                mgrPrx->removeListener(listenerPrx);
             }
             catch (const Ice::Exception& ex)
             {
@@ -502,6 +504,7 @@ public:
                 servant->wait(5000);
                 BOOST_CHECK(servant->createCalls() == 2);
                 bridge->shutdown();
+                mgrPrx->removeListener(listenerPrx);
             }
             catch (const Ice::Exception& ex)
             {
@@ -571,8 +574,8 @@ public:
                 dumplog(log);
 
                 sessions = bridge->listSessions();
-                BOOST_CHECK(sessions.size() == 2);
-                BOOST_CHECK(sessions.back() == b);
+                BOOST_CHECK(sessions.size() == 2); // XXX
+                BOOST_CHECK(sessions.back() == b); // XXX
                 bridge->shutdown();
 
             }
@@ -638,7 +641,7 @@ public:
                 //
                 mgrPrx->listBridges();
 
-                BOOST_CHECK(servant->createCalls() == 1);
+                BOOST_CHECK(servant->createCalls() == 1); // XXX
                 mgrPrx->removeListener(listenerPrx);
                 
                 bridge = mgrPrx->createBridge(sessions, 0);
@@ -649,12 +652,13 @@ public:
                 BOOST_CHECK(servant->createCalls() == 1);
                 BridgeSeq bridges = mgrPrx2->listBridges();
                 BridgeSeq bridges2 = mgrPrx->listBridges();
-                BOOST_CHECK(bridges.size() == bridges2.size());
+                BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
                 mgrPrx->addListener(listenerPrx);
                 bridge = mgrPrx->createBridge(sessions, 0);
                 servant->wait(5000);
                 BOOST_CHECK(servant->createCalls() == 2);
                 bridge->shutdown();
+                mgrPrx->removeListener(listenerPrx);
             }
             catch (const Ice::Exception& ex)
             {

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/bridging.git



More information about the asterisk-scf-commits mailing list