[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridge-replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Mar 3 08:47:42 CST 2011
branch "bridge-replication" has been updated
via 381d74f1d554b08eaa22db45062fd709b35a4ce5 (commit)
via 5f3eac5c37e27f7d0aaf3b8ee19eae4410c124d7 (commit)
from 648a2edcda7992f8e4e7e72eb5c9f8ce4d0f4d20 (commit)
Summary of changes:
config/test_bridging.conf.in | 21 ++-
src/BridgeImpl.cpp | 335 ++++++++++++++++++++++----------
src/BridgeImpl.h | 8 +-
src/BridgeManagerImpl.cpp | 25 ++-
src/BridgeManagerImpl.h | 2 +
src/BridgeReplicatorIf.ice | 12 +-
src/BridgeReplicatorStateListenerI.cpp | 94 +++++++++-
src/CMakeLists.txt | 2 +
src/DebugUtil.h | 23 ---
src/ListenerManager.h | 27 ++-
src/SessionCollection.cpp | 46 ++++-
src/SessionCollection.h | 2 +
src/SessionListener.cpp | 11 +-
src/SessionWrapper.cpp | 129 +++++++++----
src/SessionWrapper.h | 13 +-
test/BridgeManagerListenerI.cpp | 1 -
test/TestBridging.cpp | 16 +-
17 files changed, 554 insertions(+), 213 deletions(-)
- Log -----------------------------------------------------------------
commit 381d74f1d554b08eaa22db45062fd709b35a4ce5
Author: Brent Eagles <beagles at digium.com>
Date: Thu Mar 3 11:12:36 2011 -0330
Implemented proper state removal. It wasn't clear where state should actually
be removed from the replicator until all of the lifecycle relationships between
the refactored objects were "solidified".
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 60bb791..d647416 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -98,7 +98,8 @@ public:
void activate(const BridgePrx& proxy);
void updateState(const BridgeStateItemPtr& state);
- void updateListener(const BridgeListenerStateItemPtr& update);
+ void addListener(const BridgeListenerStateItemPtr& update);
+ void removeListener(const BridgeListenerStateItemPtr& update);
void activate();
string id();
@@ -162,6 +163,11 @@ private:
void pushUpdate(const BridgeStateItemPtr& update);
void pushUpdates(const ReplicatedStateItemSeq& updates);
BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
+
+ bool replicate()
+ {
+ return (mActivated && mReplicator != 0);
+ }
};
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
@@ -185,7 +191,6 @@ static void checkSessions(const SessionSeq& sessions)
}
}
-
//
// Compiled in constants.
//
@@ -427,6 +432,7 @@ void BridgeImpl::destroy(const Ice::Current& current)
mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
mListeners->stopped();
mSessionListener = 0;
+ update = createUpdate();
}
pushUpdate(update);
@@ -447,8 +453,11 @@ void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Curre
//
return;
}
-
- if (mListeners->addListener(listener))
+
+ //
+ // Careful about ordering lest short-circuit become an issue
+ //
+ if (mListeners->addListener(listener) && replicate())
{
ReplicatedStateItemSeq seq;
seq.push_back(createFirstListenerUpdate(listener));
@@ -469,15 +478,21 @@ void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Cu
}
if (mListeners->removeListener(listener))
{
- BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
- listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
- listenerUpdate->serial = SerialCounterStart + 1; // There are really only two updates for a listener object.
- listenerUpdate->listener = listener;
- listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Delete;
- listenerUpdate->bridgeId = mState->key;
- ReplicatedStateItemSeq seq;
- seq.push_back(listenerUpdate);
- mReplicator->setState(seq);
+ string key = mState->key + ".listener." +
+ mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ if (replicate())
+ {
+ try
+ {
+ Ice::StringSeq keys;
+ keys.push_back(key);
+ mReplicator->removeState(keys);
+ }
+ catch (const std::exception& ex)
+ {
+ mLogger(Error) << "call to remove state item " << key << " failed with exception " << ex.what();
+ }
+ }
}
}
@@ -584,12 +599,25 @@ void BridgeImpl::destroyImpl()
boost::unique_lock<boost::shared_mutex> lock(mLock);
mState->runningState = Destroyed;
mSessions = 0;
-
+
mObjAdapter->remove(mPrx->ice_getIdentity());
}
catch (const Ice::Exception&)
{
}
+
+ try
+ {
+ if (replicate())
+ {
+ Ice::StringSeq keys;
+ keys.push_back(mState->key);
+ mReplicator->removeState(keys);
+ }
+ }
+ catch (const Ice::Exception&)
+ {
+ }
}
void BridgeImpl::shutdownImpl(const Ice::Current& current)
@@ -613,7 +641,6 @@ void BridgeImpl::activate(const BridgePrx& proxy)
ReplicatedStateItemSeq initialUpdates;
BridgeStateItemPtr update = createUpdate();
- update->lifeCycle = ReplicatedItemLifeCycle::Created;
initialUpdates.push_back(update);
vector<BridgeListenerPrx> listeners = mListeners->getListeners();
@@ -628,27 +655,20 @@ void BridgeImpl::activate(const BridgePrx& proxy)
void BridgeImpl::updateState(const BridgeStateItemPtr& state)
{
mLogger(Debug) << FUNLOG;
- if (state->lifeCycle == ReplicatedItemLifeCycle::Delete)
- {
- destroyImpl();
- return;
- }
boost::unique_lock<boost::shared_mutex> lock(mLock);
*mState.get() = *state.get();
}
-void BridgeImpl::updateListener(const BridgeListenerStateItemPtr& update)
+void BridgeImpl::addListener(const BridgeListenerStateItemPtr& update)
{
- if (update->lifeCycle == ReplicatedItemLifeCycle::Delete)
- {
- mListeners->removeListener(update->listener);
- }
- else
- {
- mListeners->addListener(update->listener);
- }
+ mListeners->addListener(update->listener);
}
+void BridgeImpl::removeListener(const BridgeListenerStateItemPtr& update)
+{
+ mListeners->removeListener(update->listener);
+}
+
void BridgeImpl::activate()
{
mLogger(Debug) << FUNLOG;
@@ -727,17 +747,23 @@ void BridgeImpl::statePreCheck()
BridgeStateItemPtr BridgeImpl::createUpdate()
{
++mState->serial;
- BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
- if (mState->runningState == Destroyed)
+
+ //
+ // There is no point in going through the cost of the copy if it is not going to be replicated. There is a slight
+ // race condition with pushUpdate(s) and the replicator "appearing", in which case the update wil just go out the
+ // next time the state changes.
+ //
+ if (replicate())
{
- mState->lifeCycle = ReplicatedItemLifeCycle::Delete;
+ BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+ return result;
}
- return result;
+ return 0;
}
void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
{
- if (mActivated && update)
+ if (update && replicate())
{
ReplicatedStateItemSeq seq;
seq.push_back(update);
@@ -747,7 +773,10 @@ void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
{
- mReplicator->setState(update);
+ if (update.size() > 0 && replicate())
+ {
+ mReplicator->setState(update);
+ }
}
BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeListenerPrx& listener)
@@ -756,7 +785,6 @@ BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeLis
listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
listenerUpdate->serial = SerialCounterStart;
listenerUpdate->listener = listener;
- listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Created;
listenerUpdate->bridgeId = mState->key;
return listenerUpdate;
}
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index a6e3523..a87acd6 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -104,7 +104,8 @@ public:
**/
virtual void updateState(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state) = 0;
- virtual void updateListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+ virtual void addListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+ virtual void removeListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
virtual std::string id() = 0;
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 91a1eaa..a78c3d9 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -94,6 +94,8 @@ public:
void createBridgeReplica(const BridgeStateItemPtr& bridgeState);
+ void removeBridge(const BridgeStateItemPtr& bridgeState);
+
private:
boost::shared_mutex mLock;
@@ -388,6 +390,22 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
mBridges.push_back(info);
}
+void BridgeManagerImpl::removeBridge(const BridgeStateItemPtr& state)
+{
+ mLogger(Debug) << FUNLOG;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
+ {
+ if (i->servant->id() == state->bridgeId)
+ {
+ i->servant->destroyImpl();
+ mBridges.erase(i);
+ break;
+ }
+ }
+ mLogger(Debug) << FUNLOG << ": reaping completed, bridge set size is now " << mBridges.size() << "." ;
+}
+
void BridgeManagerImpl::reap()
{
mLogger(Debug) << FUNLOG << ": reaping bridge set of " << mBridges.size() << " bridges." ;
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index f466000..d168f4e 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -53,6 +53,8 @@ public:
virtual std::string id() = 0;
virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
+
+ virtual void removeBridge(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
};
typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 60befec..e31c153 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -42,13 +42,6 @@ const string StateReplicatorDiscoveryCategory = "BridgeReplicator";
**/
const int SerialCounterStart = 100;
-enum ReplicatedItemLifeCycle
-{
- Created,
- Update,
- Delete
-};
-
/**
*
* Base class for replicated state.
@@ -62,7 +55,6 @@ class ReplicatedStateItem
{
string key; /* unique identifier for this state item */
long serial; /* a version identifier */
- ReplicatedItemLifeCycle lifeCycle = Update;
};
sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index bc70ab5..c5060d0 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -41,11 +41,67 @@ public:
{
}
- void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
+ void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
{
for (Ice::StringSeq::const_iterator k = itemKeys.begin(); k != itemKeys.end(); ++k)
{
- mItems.erase((*k));
+ map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*k));
+ if (entry != mItems.end())
+ {
+ ReplicatedStateItemPtr item = entry->second;
+ mItems.erase(entry);
+ BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast(item);
+ if (bridgedSessionItem)
+ {
+ vector<BridgeServantPtr> bridges = mManager->getBridges();
+ bool found = false;
+ for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+ {
+ if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
+ {
+ SessionCollectionPtr sessions = (*b)->sessions();
+ sessions->removeSession(bridgedSessionItem);
+ //
+ // Keep the session list clean.
+ //
+ found = true;
+ }
+ //
+ // We could break here if we could be sure that there were no other updates.
+ //
+ }
+ continue;
+ }
+
+ BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast(item);
+ if (bridgeListener)
+ {
+ vector<BridgeServantPtr> bridges = mManager->getBridges();
+ for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+ {
+ if ((*b) && (*b)->id() == bridgeListener->bridgeId)
+ {
+ (*b)->removeListener(bridgeListener);
+ }
+ //
+ // We could break here if we could be sure that there were no other updates.
+ //
+ }
+
+ continue;
+ }
+ BridgeStateItemPtr bridgeItem = BridgeStateItemPtr::dynamicCast(item);
+ if (bridgeItem)
+ {
+ dumpState(cerr, bridgeItem, current.adapter->getCommunicator());
+ mManager->removeBridge(bridgeItem);
+ continue;
+ }
+
+ ///
+ // The bridge manager isn't really removable.
+ //
+ }
}
}
@@ -111,7 +167,8 @@ public:
//
}
- if (!found && bridgeItem->lifeCycle != ReplicatedItemLifeCycle::Delete)
+
+ if (!found)
{
if (existingItem)
{
@@ -160,7 +217,7 @@ public:
{
if ((*b) && (*b)->id() == bridgeListener->bridgeId)
{
- (*b)->updateListener(bridgeListener);
+ (*b)->addListener(bridgeListener);
}
//
// We could break here if we could be sure that there were no other updates.
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 519fec9..cfd8ac7 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -139,7 +139,10 @@ void SessionCollection::reap()
set<BridgedSessionState> states;
states.insert(BridgedSessionState::Disconnected);
states.insert(BridgedSessionState::Done);
-
+
+ //
+ // Pick out all of the sessions that should be reaped and remove them from the map.
+ //
vector<SessionWrapperPtr> reaped;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -156,36 +159,35 @@ void SessionCollection::reap()
}
}
}
- for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+
+ if (mReplicator)
{
- (*i)->reaped();
+ Ice::StringSeq removedKeys;
+ for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+ {
+ removedKeys.push_back((*i)->id());
+ }
+ mReplicator->removeState(removedKeys);
}
+ //
+ // Unless the wrapper is being held for some other reason, it should be deleted
+ // after it goes out of this scope.
+ //
}
void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
{
SessionWrapperPtr updater;
- bool removal = session->lifeCycle == ReplicatedItemLifeCycle::Delete;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
SessionMap::iterator i = mMap.find(session->key);
if (i == mMap.end())
{
- if (!removal)
- {
- mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
- }
+ mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
}
else
{
- if (!removal)
- {
- updater = i->second;
- }
- else
- {
- mMap.erase(i);
- }
+ updater = i->second;
}
}
if (updater)
@@ -193,3 +195,13 @@ void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
updater->update(session);
}
}
+
+void SessionCollection::removeSession(const BridgedSessionPtr& session)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionMap::iterator i = mMap.find(session->key);
+ if (i != mMap.end())
+ {
+ mMap.erase(i);
+ }
+}
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index ea3aed0..a93f7c2 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -117,6 +117,8 @@ public:
void reap();
void replicaUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& bridgedSession);
+
+ void removeSession(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& bridgedSession);
private:
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 6d08241..e5eeb5e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -29,7 +29,8 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session, const Replicato
const Logger& logger) :
mSession(session),
mReplicator(replicator),
- mLogger(logger)
+ mLogger(logger),
+ mId(mSession->key)
{
}
@@ -158,20 +159,14 @@ bool SessionWrapper::isDestroyed()
return mSession->currentState == BridgedSessionState::Done;
}
-void SessionWrapper::reaped()
+string SessionWrapper::id()
{
- BridgedSessionPtr update;
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSession->lifeCycle = ReplicatedItemLifeCycle::Delete;
- update = createUpdate();
- }
- pushUpdate(update);
+ return mId;
}
void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
{
- if (update)
+ if (update && mReplicator)
{
ReplicatedStateItemSeq seq;
seq.push_back(update);
@@ -197,8 +192,13 @@ void SessionWrapper::unplugMedia()
BridgedSessionPtr SessionWrapper::createUpdate()
{
//
- // While there is a little overhead copying everytime there is an update, this decouples the locking from the replication process.
+ // While there is a little overhead copying everytime there is an update, this decouples the locking from the
+ // replication process.
//
++mSession->serial;
- return new BridgedSession(*mSession.get());
+ if (mReplicator)
+ {
+ return new BridgedSession(*mSession.get());
+ }
+ return 0;
}
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 6d5b5a9..e8e2d66 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -108,14 +108,8 @@ public:
**/
bool isDestroyed();
- /**
- *
- * Just a final stage in this object's lifecycle. This will cause and update to be replicated out to notify that
- * this item is no longer needed.
- *
- **/
- void reaped();
-
+ std::string id();
+
private:
mutable boost::shared_mutex mLock;
@@ -123,6 +117,7 @@ private:
AsteriskSCF::BridgeService::ReplicatorSmartPrx mReplicator;
MediaConnectorPtr mConnector;
AsteriskSCF::System::Logging::Logger mLogger;
+ std::string mId;
/**
* Sends changes to the replication service. This should never occur
commit 5f3eac5c37e27f7d0aaf3b8ee19eae4410c124d7
Author: Brent Eagles <beagles at digium.com>
Date: Wed Mar 2 18:09:21 2011 -0330
- Fixed several replication and related lifecycle issues.
- Fixed a bug introduced in the default bridge listener mechanism when refactoring for replication.
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 22c0c4a..8b16e1b 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -19,19 +19,32 @@ TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
IceBox.InheritProperties=1
IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
-IceBox.Service.Replicator=../src at BridgeReplicator::create
+IceBox.Service.Replicator=../src at BridgeReplicator:create
IceBox.Service.TestBridge=../src at bridgeservice:create
IceBox.Service.TestBridge2=../src/@bridgeservice:create
IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridging_unit_test:create
+IceBox.Service.TestDriver=../test at bridge_component_test:create
TestBridge.InstanceName=TestBridge
TestBridge.BridgeService.Endpoints=default -p 55561
+TestBridge.BridgeService.ThreadPool.Size=4
TestBridge.Proxy=BridgeManager:default -p 55561
+TestBridge.BridgeServiceInternal.Endpoints=default -p 65461
+TestBridge.BridgeServiceInternal.ThreadPool.Size=4
+
TestBridge2.InstanceName=TestBridge2
-TestBridge2.BridgeService.Endpoints=default -p 55562
-TestBridge2.Proxy=BridgeManager:default -p 55562
+TestBridge2.BridgeService.Endpoints=default -p 55572
+TestBridge2.BridgeService.ThreadPool.Size=4
+TestBridge2.Proxy=BridgeManager:default -p 55572
+TestBridge2.BridgeServiceInternal.Endpoints=default -p 65462
+TestBridge2.BridgeServiceInternal.ThreadPool.Size=4
+
+Replicator.InstanceName=Replicator
+Replicator.BridgeReplicator.Endpoints=default -p 63242
+Replciator.Proxy=BridgeReplicator:default -p 63242
+Replicator.BridgeReplicator.ThreadPool.Size=4
+
TestChannel.Proxy=TestChannel:default -p 55560
TestUtilAdapter.Endpoints=default -p 55562
Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 61df06c..60bb791 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -53,7 +53,7 @@ using namespace std;
*
*/
-namespace
+namespace
{
/**
@@ -97,9 +97,9 @@ public:
void shutdownImpl(const Ice::Current& current);
void activate(const BridgePrx& proxy);
- BridgeStateItemPtr getState();
void updateState(const BridgeStateItemPtr& state);
- void updateState(const BridgedSessionPtr& sessionState);
+ void updateListener(const BridgeListenerStateItemPtr& update);
+
void activate();
string id();
SessionCollectionPtr sessions();
@@ -158,8 +158,10 @@ private:
IceUtil::Handle<IceUtil::Thread> mShutdownThread;
void statePreCheck();
- void update();
- void updateNoLock();
+ BridgeStateItemPtr createUpdate();
+ void pushUpdate(const BridgeStateItemPtr& update);
+ void pushUpdates(const ReplicatedStateItemSeq& updates);
+ BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
};
typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
@@ -183,6 +185,7 @@ static void checkSessions(const SessionSeq& sessions)
}
}
+
//
// Compiled in constants.
//
@@ -206,6 +209,7 @@ BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
mSessionListener(createSessionListener(mSessions, logger)),
mLogger(logger)
{
+ mLogger(Debug) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin();
i != listeners.end(); ++i)
{
@@ -288,16 +292,20 @@ void BridgeImpl::addSessions(const SessionSeq& sessions, const Ice::Current& cur
{
}
}
-
+
+ BridgeStateItemPtr update;
if (addedSessions.size())
{
mListeners->sessionsAdded(addedSessions);
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ update = createUpdate();
+ }
}
- update();
+ pushUpdate(update);
}
-void BridgeImpl::removeSessions(
- const SessionSeq& sessions, const Ice::Current& current)
+void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current& current)
{
if (sessions.size() == 0)
{
@@ -306,8 +314,7 @@ void BridgeImpl::removeSessions(
checkSessions(sessions);
SessionSeq removedSessions;
statePreCheck();
- for (SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
{
SessionWrapperPtr session = mSessions->getSession(*i);
if (session)
@@ -334,102 +341,144 @@ void BridgeImpl::removeSessions(
mListeners->sessionsRemoved(removedSessions);
}
- CountIfOperation< Negate <IfStateCriteria> > counter(IfStateCriteria(BridgedSessionState::Done));
- mSessions->visitSessions(counter);
- if (counter.count() < 2)
+ SessionSeq sessionsLeft(mSessions->getSessionSeq());
+ if (sessionsLeft.size() < 2)
{
spawnShutdown();
}
- update();
+ mSessions->reap();
}
SessionSeq BridgeImpl::listSessions(const Ice::Current& current)
{
mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
statePreCheck();
- set< BridgedSessionState > excludedStates;
- excludedStates.insert(BridgedSessionState::Done);
- excludedStates.insert(BridgedSessionState::Disconnected);
- Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > > whereNot(excludedStates);
- SelectOperation< Negate< IfInCriteria< set< BridgedSessionState >, StateMemberSelector > >, SessionPrxSelector, SessionSeq> select(whereNot);
- mSessions->visitSessions(select);
- return select.results();
+ return mSessions->getSessionSeq();
}
void BridgeImpl::shutdown(const Ice::Current& current)
{
//
+ // In an effort to allow some consistency with replicas, the shutdown operation is broken into
+ // two parts. In an asynchronous version, this would probably be a couple of queued tasks.
+ //
+
+ //
// When shutting down, the bridge makes a copy of its current state and unlocks, proceeding with
// no other internal locks.
//
mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mState->runningState == ShuttingDown)
+
+ BridgeStateItemPtr update;
{
- mLogger(Debug) << FUNLOG << ": called when shutting down." ;
- return;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mState->runningState == ShuttingDown)
+ {
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+ return;
+ }
+ if (mState->runningState == Destroyed)
+ {
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ mState->runningState = ShuttingDown;
+
+ mListeners->stopping();
+ update = createUpdate();
}
- if (mState->runningState == Destroyed)
+ pushUpdate(update);
+ update = 0;
{
- mLogger(Debug) << FUNLOG << ": called when destroyed." ;
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+ mSessions->visitSessions(shutdownOp);
+ mListeners->stopped();
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
+ mState->runningState = Destroyed;
+ //
+ // Remove references to the session listener implementation.
+ //
+ update = createUpdate();
+ mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+ mSessionListener = 0;
}
- mState->runningState = ShuttingDown;
-
- mListeners->stopping();
-
- ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
- mSessions->visitSessions(shutdownOp);
-
- mLogger(Info) << objectIdFromCurrent(current) << ": is shutdown." ;
- mListeners->stopped();
- mState->runningState = Destroyed;
-
- //
- // Remove references to the session listener implementation.
- //
- mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
- mSessionListener = 0;
-
- updateNoLock();
+ mSessions = 0;
+ pushUpdate(update);
}
void BridgeImpl::destroy(const Ice::Current& current)
{
mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mState->runningState == ShuttingDown)
+ BridgeStateItemPtr update;
{
- mLogger(Debug) << FUNLOG << ": called when shutting down." ;
- throw AsteriskSCF::System::Component::V1::ShuttingDown();
- }
- if (mState->runningState == Destroyed)
- {
- mLogger(Debug) << FUNLOG << ": called when destroyed." ;
- throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mState->runningState == ShuttingDown)
+ {
+ mLogger(Debug) << FUNLOG << ": called when shutting down." ;
+ throw AsteriskSCF::System::Component::V1::ShuttingDown();
+ }
+ if (mState->runningState == Destroyed)
+ {
+ mLogger(Debug) << FUNLOG << ": called when destroyed." ;
+ throw Ice::ObjectNotExistException(__FILE__, __LINE__);
+ }
+ mState->runningState = Destroyed;
+ mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
+ mListeners->stopped();
+ mSessionListener = 0;
}
- mState->runningState = Destroyed;
- mLogger(Info) << objectIdFromCurrent(current) << ": is now destroyed." ;
- mListeners->stopped();
- mSessionListener = 0;
+ pushUpdate(update);
//
// Remove references to the session listener implementation.
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
- updateNoLock();
}
-void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current&)
+void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current& current)
{
- mListeners->addListener(listener);
- update();
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current) << " with bridge listener: " <<
+ (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+ if (!listener)
+ {
+ //
+ // TODO should probably throw an exception here.
+ //
+ return;
+ }
+
+ if (mListeners->addListener(listener))
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(createFirstListenerUpdate(listener));
+ mReplicator->setState(seq);
+ }
}
-void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current&)
+void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Current& current)
{
- mListeners->removeListener(listener);
- update();
+ mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current) << " with bridge listener: " <<
+ (listener ? "(nil)" : mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity()));
+ if (!listener)
+ {
+ //
+ // TODO should probably throw an exception here.
+ //
+ return;
+ }
+ if (mListeners->removeListener(listener))
+ {
+ BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
+ listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ listenerUpdate->serial = SerialCounterStart + 1; // There are really only two updates for a listener object.
+ listenerUpdate->listener = listener;
+ listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Delete;
+ listenerUpdate->bridgeId = mState->key;
+ ReplicatedStateItemSeq seq;
+ seq.push_back(listenerUpdate);
+ mReplicator->setState(seq);
+ }
}
void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq& newSessions, const Ice::Current& current)
@@ -464,7 +513,6 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
SessionSeq removed;
removed.push_back(oldSession);
mListeners->sessionsRemoved(removed);
- update();
SessionSeq added;
for (SessionSeq::const_iterator i = newSessions.begin(); i != newSessions.end(); ++i)
@@ -520,21 +568,23 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
// Now update the listeners.
//
mListeners->sessionsAdded(added);
- update();
}
bool BridgeImpl::destroyed()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
mLogger(Debug) << FUNLOG << ": " << (mState == Destroyed ? "yes, I am destroyed." : "no, I am not destroyed") ;
- return mState == Destroyed;
+ return mState->runningState == Destroyed;
}
void BridgeImpl::destroyImpl()
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
try
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mState->runningState = Destroyed;
+ mSessions = 0;
+
mObjAdapter->remove(mPrx->ice_getIdentity());
}
catch (const Ice::Exception&)
@@ -560,22 +610,57 @@ void BridgeImpl::activate(const BridgePrx& proxy)
SessionListenerPrx::uncheckedCast(
mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
);
- update();
+
+ ReplicatedStateItemSeq initialUpdates;
+ BridgeStateItemPtr update = createUpdate();
+ update->lifeCycle = ReplicatedItemLifeCycle::Created;
+ initialUpdates.push_back(update);
+
+ vector<BridgeListenerPrx> listeners = mListeners->getListeners();
+ for (vector<BridgeListenerPrx>::iterator i = listeners.begin(); i != listeners.end(); ++i)
+ {
+ initialUpdates.push_back(createFirstListenerUpdate(*i));
+ }
+
+ pushUpdates(initialUpdates);
}
-BridgeStateItemPtr BridgeImpl::getState()
+void BridgeImpl::updateState(const BridgeStateItemPtr& state)
{
mLogger(Debug) << FUNLOG;
- BridgeStateItemPtr result(new BridgeStateItem);
- *result = *mState;
- return result;
+ if (state->lifeCycle == ReplicatedItemLifeCycle::Delete)
+ {
+ destroyImpl();
+ return;
+ }
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ *mState.get() = *state.get();
}
-void BridgeImpl::updateState(const BridgeStateItemPtr& state)
+void BridgeImpl::updateListener(const BridgeListenerStateItemPtr& update)
+{
+ if (update->lifeCycle == ReplicatedItemLifeCycle::Delete)
+ {
+ mListeners->removeListener(update->listener);
+ }
+ else
+ {
+ mListeners->addListener(update->listener);
+ }
+}
+
+void BridgeImpl::activate()
{
mLogger(Debug) << FUNLOG;
boost::unique_lock<boost::shared_mutex> lock(mLock);
- *mState.get() = *state.get();
+ mActivated = true;
+ mPrx = BridgePrx::uncheckedCast(mObjAdapter->add(this, mObjAdapter->getCommunicator()->stringToIdentity(mState->key)));
+ string listenerId = mState->key + ".sessionListener";
+ mSessionListenerPrx = SessionListenerPrx::uncheckedCast(
+ mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId)));
+ //
+ // Now this replica should be ready to go!
+ //
}
string BridgeImpl::id()
@@ -588,13 +673,6 @@ SessionCollectionPtr BridgeImpl::sessions()
return mSessions;
}
-void BridgeImpl::activate()
-{
- mLogger(Debug) << FUNLOG;
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mActivated = true;
-}
-
namespace
{
class ShutdownThread : public IceUtil::Thread
@@ -630,39 +708,59 @@ void BridgeImpl::spawnShutdown()
}
}
-void BridgeImpl::update()
-{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- updateNoLock();
-}
-
-void BridgeImpl::updateNoLock()
-{
- if (mActivated)
- {
- ++mState->serial;
- ReplicatedStateItemSeq seq;
- seq.push_back(getState());
- mReplicator->setState(seq);
- }
-}
-
void BridgeImpl::statePreCheck()
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
- if (mState == ShuttingDown)
+ if (mState->runningState == ShuttingDown)
{
mLogger(Debug) << FUNLOG << ": called when shutting down." ;
throw AsteriskSCF::System::Component::V1::ShuttingDown();
}
- if (mState == Destroyed)
+ if (mState->runningState == Destroyed)
{
mLogger(Debug) << FUNLOG << ": called when destroyed." ;
throw Ice::ObjectNotExistException(__FILE__, __LINE__);
}
}
+BridgeStateItemPtr BridgeImpl::createUpdate()
+{
+ ++mState->serial;
+ BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+ if (mState->runningState == Destroyed)
+ {
+ mState->lifeCycle = ReplicatedItemLifeCycle::Delete;
+ }
+ return result;
+}
+
+void BridgeImpl::pushUpdate(const BridgeStateItemPtr& update)
+{
+ if (mActivated && update)
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(update);
+ pushUpdates(seq);
+ }
+}
+
+void BridgeImpl::pushUpdates(const ReplicatedStateItemSeq& update)
+{
+ mReplicator->setState(update);
+}
+
+BridgeListenerStateItemPtr BridgeImpl::createFirstListenerUpdate(const BridgeListenerPrx& listener)
+{
+ BridgeListenerStateItemPtr listenerUpdate(new BridgeListenerStateItem);
+ listenerUpdate->key = mState->key + ".listener." + mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
+ listenerUpdate->serial = SerialCounterStart;
+ listenerUpdate->listener = listener;
+ listenerUpdate->lifeCycle = ReplicatedItemLifeCycle::Created;
+ listenerUpdate->bridgeId = mState->key;
+ return listenerUpdate;
+}
+
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant>
AsteriskSCF::BridgeService::BridgeServant::create(const string& name, const Ice::ObjectAdapterPtr& objectAdapter,
const vector<BridgeListenerPrx>& listeners,
@@ -690,5 +788,8 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state)
{
logger(Debug) << FUNLOG << ": creating replica for " << state->bridgeId;
- return new BridgeImpl(state->bridgeId, objectAdapter, state->listeners, listenerMgr, replicator, state, logger);
+ IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
+ listenerMgr, replicator, state, logger));
+
+ return bridge;
}
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 3e3c865..a6e3523 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -72,7 +72,7 @@ public:
* to listeners and bridged sessions to notify them that the bridge is
* going away will not be made! Use shutdownImpl() for that kind of thing.
*
- **/
+ **/
virtual void destroyImpl() = 0;
/**
@@ -102,11 +102,10 @@ public:
* TODO: complete documentation.
*
**/
-
- virtual AsteriskSCF::Bridge::V1::BridgeStateItemPtr getState() = 0;
-
virtual void updateState(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& state) = 0;
+ virtual void updateListener(const AsteriskSCF::Bridge::V1::BridgeListenerStateItemPtr& state) = 0;
+
virtual std::string id() = 0;
virtual SessionCollectionPtr sessions() = 0;
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 3f6b4a7..91a1eaa 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -105,7 +105,6 @@ private:
BridgeManagerPrx mSourceProxy;
BridgeManagerListenerMgrPtr mListeners;
Logger mLogger;
- vector<BridgeListenerPrx> mDefaultBridgeListeners;
BridgeManagerStateItemPtr mState;
@@ -180,7 +179,7 @@ BridgePrx BridgeManagerImpl::createBridge(const SessionSeq& sessions,
Ice::Identity id(mAdapter->getCommunicator()->stringToIdentity(stringId));
BridgePrx prx(BridgePrx::uncheckedCast(mAdapter->createProxy(id)));
BridgeListenerMgrPtr mgr(new BridgeListenerMgr(mAdapter->getCommunicator(), stringId, prx));
- vector<BridgeListenerPrx> listeners(mDefaultBridgeListeners);
+ vector<BridgeListenerPrx> listeners(mState->defaultBridgeListeners);
if (listener)
{
listeners.push_back(listener);
@@ -256,6 +255,7 @@ void BridgeManagerImpl::addDefaultBridgeListener(const BridgeListenerPrx& newLis
if (mState->defaultBridgeListeners.end() == find_if(mState->defaultBridgeListeners.begin(), mState->defaultBridgeListeners.end(),
IdentityComparePredicate<BridgeListenerPrx>(newListener)))
{
+ mLogger(Debug) << FUNLOG << ": adding new listener " << current.adapter->getCommunicator()->identityToString(newListener->ice_getIdentity());
mState->defaultBridgeListeners.push_back(newListener);
}
update();
@@ -280,7 +280,8 @@ void BridgeManagerImpl::removeDefaultBridgeListener(const BridgeListenerPrx& toR
BridgeSeq BridgeManagerImpl::listBridges(const Ice::Current& current)
{
mLogger(Debug) << FUNLOG << ":" << objectIdFromCurrent(current);
- boost::shared_lock<boost::shared_mutex> lock(mLock);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ reap();
BridgeSeq result;
for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end();++i)
{
diff --git a/src/BridgeReplicatorIf.ice b/src/BridgeReplicatorIf.ice
index 0fa5dfe..60befec 100644
--- a/src/BridgeReplicatorIf.ice
+++ b/src/BridgeReplicatorIf.ice
@@ -42,6 +42,13 @@ const string StateReplicatorDiscoveryCategory = "BridgeReplicator";
**/
const int SerialCounterStart = 100;
+enum ReplicatedItemLifeCycle
+{
+ Created,
+ Update,
+ Delete
+};
+
/**
*
* Base class for replicated state.
@@ -55,6 +62,7 @@ class ReplicatedStateItem
{
string key; /* unique identifier for this state item */
long serial; /* a version identifier */
+ ReplicatedItemLifeCycle lifeCycle = Update;
};
sequence<ReplicatedStateItem> ReplicatedStateItemSeq;
@@ -200,15 +208,15 @@ class BridgeStateItem extends ReplicatedStateItem
**/
ServiceState runningState;
- /**
- * These items are replicated separately, but mirrored here.
- * TODO: see if they can be completely removed.
- **/
- BridgedSessionSeq bridgedSessions;
- BridgeListenerSeq listeners;
MediaOperationReplicationPolicy mediaReplicationPolicy;
};
+class BridgeListenerStateItem extends ReplicatedStateItem
+{
+ string bridgeId;
+ AsteriskSCF::SessionCommunications::V1::BridgeListener* listener;
+};
+
/**
*
* The BridgeReplicatorListener interface must be implemented by components
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 7cef728..bc70ab5 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -57,6 +57,10 @@ public:
ReplicatedStateItemPtr existingItem;
if (entry != mItems.end())
{
+ //
+ // Note: Another serial number situation to consider is two updates of the same key with
+ // the same serial number.
+ //
//
// Look at serial numbers and indicate an out of sequence update. We should
@@ -107,11 +111,12 @@ public:
//
}
- if (!found)
+ if (!found && bridgeItem->lifeCycle != ReplicatedItemLifeCycle::Delete)
{
if (existingItem)
{
- mLogger(Error) << "Replica listener has a bridge object that the bridge manager does not know about. This likely indicates an error and should be investigated.";
+ mLogger(Error) << "Replica listener has a bridge object that the bridge manager "
+ "does not know about. This likely indicates an error and should be investigated.";
}
mManager->createBridgeReplica(bridgeItem);
}
@@ -129,6 +134,9 @@ public:
{
SessionCollectionPtr sessions = (*b)->sessions();
sessions->replicaUpdate(bridgedSessionItem);
+ //
+ // Keep the session list clean.
+ //
found = true;
}
//
@@ -143,6 +151,29 @@ public:
continue;
}
+ BridgeListenerStateItemPtr bridgeListener = BridgeListenerStateItemPtr::dynamicCast((*i));
+ if (bridgeListener)
+ {
+ vector<BridgeServantPtr> bridges = mManager->getBridges();
+ bool found = false;
+ for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
+ {
+ if ((*b) && (*b)->id() == bridgeListener->bridgeId)
+ {
+ (*b)->updateListener(bridgeListener);
+ }
+ //
+ // We could break here if we could be sure that there were no other updates.
+ //
+ }
+ if (!found)
+ {
+ mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ }
+
+ continue;
+ }
+
mLogger(Info) << "Bridge replicator service received an unrecognized replication item.";
}
}
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4685e05..f9b39d1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -11,6 +11,8 @@ asterisk_scf_component_add_file(bridgeservice BridgeManagerListenerMgr.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeReplicatorStateListenerI.cpp)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.h)
asterisk_scf_component_add_file(bridgeservice BridgeManagerImpl.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionListener.cpp)
+asterisk_scf_component_add_file(bridgeservice SessionListener.h)
asterisk_scf_component_add_file(bridgeservice SessionWrapper.cpp)
asterisk_scf_component_add_file(bridgeservice SessionWrapper.h)
asterisk_scf_component_add_file(bridgeservice SessionCollection.cpp)
diff --git a/src/DebugUtil.h b/src/DebugUtil.h
index 7c9d565..b691050 100644
--- a/src/DebugUtil.h
+++ b/src/DebugUtil.h
@@ -109,29 +109,6 @@ std::ostream& dumpState(std::ostream& os, const AsteriskSCF::Bridge::V1::BridgeS
default:
os << "(invalid)\n";
}
- int index = 0;
- for (AsteriskSCF::Bridge::V1::BridgedSessionSeq::const_iterator i = state->bridgedSessions.begin();
- i != state->bridgedSessions.end(); ++i)
- {
- if (index == 0)
- {
- os << "Bridged sessions (" << state->bridgedSessions.size() << "):\n";
- index = 1;
- }
- dumpState(os, "\t", *i, comm);
- }
- index = 0;
- for (AsteriskSCF::Bridge::V1::BridgeListenerSeq::const_iterator i = state->listeners.begin();
- i != state->listeners.end(); ++i)
- {
- if (index == 0)
- {
- os << "Bridge listeners (" << state->listeners.size() << "):\n";
- }
- os << index << ". " << comm->identityToString((*i)->ice_getIdentity()) << '\n';
- ++index;
- }
- index = 0;
os << "Media replication policy: ";
switch (state->mediaReplicationPolicy)
{
diff --git a/src/ListenerManager.h b/src/ListenerManager.h
index c6ef841..c8d6843 100644
--- a/src/ListenerManager.h
+++ b/src/ListenerManager.h
@@ -112,15 +112,19 @@ public:
// NOTE: The current implementation is a little fast and loose here. Inconsistent conditions
// and whatnot are not flagged.
//
- void addListener(const T& listener)
+ bool addListener(const T& listener)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+ bool added = false;
{
- mListeners.push_back(listener);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if(std::find(mListeners.begin(), mListeners.end(), listener) == mListeners.end())
+ {
+ mListeners.push_back(listener);
+ added = true;
+ }
}
- if(mInitialized)
+ if(mInitialized && added)
{
IceStorm::QoS qos;
qos["reliability"] = "ordered";
@@ -129,16 +133,18 @@ public:
{
mTopic->subscribeAndGetPublisher(qos, listener);
}
- catch(const IceStorm::AlreadySubscribed&)
+ catch (const IceStorm::AlreadySubscribed&)
{
//
- // This indicates some kind of inconsistent state.
+ // This indicates some kind of inconsistent state or could
+ // happen if this is a replica.
//
}
}
+ return added;
}
- void removeListener(const T& listener)
+ bool removeListener(const T& listener)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
typename std::vector<T>::iterator i = std::find(mListeners.begin(), mListeners.end(), listener);
@@ -147,9 +153,14 @@ public:
mListeners.erase(i);
if(mInitialized)
{
+ //
+ // unsubscribe doesn't seem to care whether the subscriber was subscribed or not.
+ //
mTopic->unsubscribe(listener);
}
+ return true;
}
+ return false;
}
std::vector<T> getListeners()
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index 4dac11f..519fec9 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -136,32 +136,56 @@ size_t SessionCollection::size()
void SessionCollection::reap()
{
- for (SessionMap::iterator i = mMap.begin(); i != mMap.end();)
+ set<BridgedSessionState> states;
+ states.insert(BridgedSessionState::Disconnected);
+ states.insert(BridgedSessionState::Done);
+
+ vector<SessionWrapperPtr> reaped;
{
- if (i->second->isDestroyed())
- {
- mMap.erase(i++);
- }
- else
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ for (SessionMap::iterator i = mMap.begin(); i != mMap.end();)
{
- ++i;
+ if (states.find(i->second->getBridgedSession()->currentState) != states.end())
+ {
+ reaped.push_back(i->second);
+ mMap.erase(i++);
+ }
+ else
+ {
+ ++i;
+ }
}
}
+ for (vector<SessionWrapperPtr>::iterator i = reaped.begin(); i != reaped.end(); ++i)
+ {
+ (*i)->reaped();
+ }
}
void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
{
SessionWrapperPtr updater;
+ bool removal = session->lifeCycle == ReplicatedItemLifeCycle::Delete;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
SessionMap::iterator i = mMap.find(session->key);
if (i == mMap.end())
{
- mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+ if (!removal)
+ {
+ mMap[session->key] = new SessionWrapper(session, mReplicator, mLogger);
+ }
}
else
{
- updater = i->second;
+ if (!removal)
+ {
+ updater = i->second;
+ }
+ else
+ {
+ mMap.erase(i);
+ }
}
}
if (updater)
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index ec58c22..3f0312b 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -60,8 +60,9 @@ public:
//
// XXX- media pairings.
//
- session->connect();
- mSessions->visitSessions(ConnectSessionOperation(source, mLogger));
+ session->setConnected();
+ ConnectSessionOperation connector(source, mLogger);
+ mSessions->visitSessions(connector);
}
catch (const Ice::Exception& ex)
{
@@ -85,7 +86,8 @@ public:
void ringing(const SessionPrx& source, const Ice::Current&)
{
- mSessions->visitSessions(RingSessionOperation(source, mLogger));
+ RingSessionOperation ringer(source, mLogger);
+ mSessions->visitSessions(ringer);
}
void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
@@ -93,9 +95,10 @@ public:
string proxyString = source->ice_toString();
mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
+ SessionWrapperPtr session;
try
{
- SessionWrapperPtr session = mSessions->getSession(source);
+ session = mSessions->getSession(source);
if (!session)
{
mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 67d03ba..6d08241 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -41,35 +41,40 @@ bool SessionWrapper::isConnected()
void SessionWrapper::connect()
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mSession->currentState == BridgedSessionState::Connected)
+ if (setConnected())
{
- return;
+ mSession->session->connect();
}
-
- //
- // TODO: AMI!
- //
- mSession->session->connect();
- mSession->currentState = BridgedSessionState::Connected;
- pushUpdate();
}
void SessionWrapper::ring()
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- if (mSession->currentState == BridgedSessionState::Connected)
{
- return;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ if (mSession->currentState == BridgedSessionState::Connected)
+ {
+ return;
+ }
}
mSession->session->ring();
}
-void SessionWrapper::setConnected()
+bool SessionWrapper::setConnected()
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSession->currentState = BridgedSessionState::Connected;
- pushUpdate();
+ BridgedSessionPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ //
+ // TODO: Going from done or disconnected to connected is probably a bug. Investigate!
+ //
+ if (mSession->currentState != BridgedSessionState::Connected)
+ {
+ mSession->currentState = BridgedSessionState::Connected;
+ update = createUpdate();
+ }
+ }
+ pushUpdate(update);
+ return (update != 0);
}
BridgedSessionPtr SessionWrapper::getBridgedSession() const
@@ -86,18 +91,36 @@ SessionPrx SessionWrapper::getSession() const
void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mConnector = connector;
- mSession->currentState = BridgedSessionState::Connected;
- pushUpdate();
+ BridgedSessionPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mSession->currentState == BridgedSessionState::Added)
+ {
+ if (!mConnector)
+ {
+ mConnector = connector;
+ }
+ mSession->currentState = BridgedSessionState::Connected;
+ update = createUpdate();
+ }
+ }
+ pushUpdate(update);
}
void SessionWrapper::disconnect()
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- unplugMedia();
- mSession->currentState = BridgedSessionState::Disconnected;
- pushUpdate();
+ BridgedSessionPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mSession->currentState != BridgedSessionState::Disconnected &&
+ mSession->currentState != BridgedSessionState::Done)
+ {
+ unplugMedia();
+ mSession->currentState = BridgedSessionState::Disconnected;
+ update = createUpdate();
+ }
+ }
+ pushUpdate(update);
}
void SessionWrapper::update(const BridgedSessionPtr& update)
@@ -114,21 +137,19 @@ void SessionWrapper::update(const BridgedSessionPtr& update)
}
}
-void SessionWrapper::pushUpdate()
-{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- ++mSession->serial;
- ReplicatedStateItemSeq seq;
- seq.push_back(mSession);
- mReplicator->setState(seq);
-}
-
void SessionWrapper::destroy()
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- unplugMedia();
- mSession->currentState = BridgedSessionState::Done;
- pushUpdate();
+ BridgedSessionPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mSession->currentState != BridgedSessionState::Done)
+ {
+ unplugMedia();
+ mSession->currentState = BridgedSessionState::Done;
+ update = createUpdate();
+ }
+ }
+ pushUpdate(update);
}
bool SessionWrapper::isDestroyed()
@@ -137,6 +158,27 @@ bool SessionWrapper::isDestroyed()
return mSession->currentState == BridgedSessionState::Done;
}
+void SessionWrapper::reaped()
+{
+ BridgedSessionPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSession->lifeCycle = ReplicatedItemLifeCycle::Delete;
+ update = createUpdate();
+ }
+ pushUpdate(update);
+}
+
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+{
+ if (update)
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(update);
+ mReplicator->setState(seq);
+ }
+}
+
void SessionWrapper::unplugMedia()
{
if (mConnector)
@@ -151,3 +193,12 @@ void SessionWrapper::unplugMedia()
}
}
}
+
+BridgedSessionPtr SessionWrapper::createUpdate()
+{
+ //
+ // While there is a little overhead copying everytime there is an update, this decouples the locking from the replication process.
+ //
+ ++mSession->serial;
+ return new BridgedSession(*mSession.get());
+}
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e8d49c6..6d5b5a9 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -57,10 +57,10 @@ public:
void ring();
/**
- * Simply sets connected state without initiating additional operations.
- * Initiates replication.
+ * Simply sets connected state without initiating additional operations. Returns true if the state was
+ * changed. Initiates replication if the state was changed.
**/
- void setConnected();
+ bool setConnected();
/**
* Accesses the wrapped bridged session. This is a bit more expensive
@@ -108,6 +108,14 @@ public:
**/
bool isDestroyed();
+ /**
+ *
+ * Just a final stage in this object's lifecycle. This will cause and update to be replicated out to notify that
+ * this item is no longer needed.
+ *
+ **/
+ void reaped();
+
private:
mutable boost::shared_mutex mLock;
@@ -120,12 +128,14 @@ private:
* Sends changes to the replication service. This should never occur
* unless the host service is active.
**/
- void pushUpdate();
+ void pushUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& update);
/**
* Disconnection helper.
**/
void unplugMedia();
+
+ AsteriskSCF::Bridge::V1::BridgedSessionPtr createUpdate();
};
typedef IceUtil::Handle<SessionWrapper> SessionWrapperPtr;
diff --git a/test/BridgeManagerListenerI.cpp b/test/BridgeManagerListenerI.cpp
index f180353..2abd677 100644
--- a/test/BridgeManagerListenerI.cpp
+++ b/test/BridgeManagerListenerI.cpp
@@ -27,7 +27,6 @@ BridgeManagerListenerI::BridgeManagerListenerI() :
void BridgeManagerListenerI::bridgeCreated(const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& manager,
const AsteriskSCF::SessionCommunications::V1::BridgePrx& bridge, const Ice::Current&)
{
- std::cerr << "XXX: bridgeCreated" << std::endl;
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(mMonitor);
++mCreated;
mMonitor.notify();
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index e1ba7b2..46cd1f1 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -440,8 +440,8 @@ public:
//
channel.commands()->answer(idA);
channel.commands()->answer(idB);
- mgrPrx->listBridges();
-
+ BridgeSeq bridges = mgrPrx->listBridges();
+ BOOST_CHECK(bridges.size() == 1);
BOOST_CHECK(bridgeListener->addedCount() == 2);
bridge->shutdown();
@@ -450,6 +450,8 @@ public:
BOOST_CHECK(find(log, "stop"));
channel.commands()->getlog(idB, log);
BOOST_CHECK(find(log, "stop"));
+ mgrPrx->removeDefaultBridgeListener(bridgeListenerPrx);
+ mgrPrx->removeListener(listenerPrx);
}
catch (const Ice::Exception& ex)
{
@@ -502,6 +504,7 @@ public:
servant->wait(5000);
BOOST_CHECK(servant->createCalls() == 2);
bridge->shutdown();
+ mgrPrx->removeListener(listenerPrx);
}
catch (const Ice::Exception& ex)
{
@@ -571,8 +574,8 @@ public:
dumplog(log);
sessions = bridge->listSessions();
- BOOST_CHECK(sessions.size() == 2);
- BOOST_CHECK(sessions.back() == b);
+ BOOST_CHECK(sessions.size() == 2); // XXX
+ BOOST_CHECK(sessions.back() == b); // XXX
bridge->shutdown();
}
@@ -638,7 +641,7 @@ public:
//
mgrPrx->listBridges();
- BOOST_CHECK(servant->createCalls() == 1);
+ BOOST_CHECK(servant->createCalls() == 1); // XXX
mgrPrx->removeListener(listenerPrx);
bridge = mgrPrx->createBridge(sessions, 0);
@@ -649,12 +652,13 @@ public:
BOOST_CHECK(servant->createCalls() == 1);
BridgeSeq bridges = mgrPrx2->listBridges();
BridgeSeq bridges2 = mgrPrx->listBridges();
- BOOST_CHECK(bridges.size() == bridges2.size());
+ BOOST_CHECK(bridges.size() == bridges2.size()); // XXX
mgrPrx->addListener(listenerPrx);
bridge = mgrPrx->createBridge(sessions, 0);
servant->wait(5000);
BOOST_CHECK(servant->createCalls() == 2);
bridge->shutdown();
+ mgrPrx->removeListener(listenerPrx);
}
catch (const Ice::Exception& ex)
{
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list